PHP 中的批处理的实现
2018-09-07 13:58
如果Web应用程序中的一个特性需要超过1秒或2秒才能完成,那么应该怎么办?需要某种离线处理解决方案。学习几种对PHP应用程序中长时间运行的作业进行离线服务的方法。
大型的连锁店有一个大问题。每天,在每家商店会发生数千次交易。公司执行官希望对这些数据进行挖掘。哪些产品卖得好?哪些不好?有机产品在哪里卖得好?冰淇淋的销售情况怎么样?
为了捕捉这些数据,组织必须将所有事务性数据装载进一个数据模型,以便更适合生成公司所需的报告类型。但是,这很花费时间,而且随着连锁规模的增长,处理一天的数据可能要花费一天以上的时间。因此,这是个大问题。
现在,您的Web应用程序可能不需要处理这么多数据,但是任何站点的处理时间都有可能超过客户愿意等待的时间。一般来说,客户愿意等待的时间是200毫秒,如果超过这个时间,客户就会觉得过程“缓慢”。这个数字基于桌面应用程序,而Web使我们更有耐心了。但无论如何,不应该让客户等待的时间超过几秒。所以,要采用一些策略来处理PHP中的批处理作业。
分散的方式与cron
在UNIX®机器上,执行批处理的核心程序是cron守护进程。这个守护进程读取一个配置文件,这个文件会告诉它要运行哪些命令行以及运行的频率。然后,这个守护进程就按照配置执行它们。在遇到错误时,它甚至能够向指定的电子邮件地址发送错误输出,从而帮助对问题进行调试。
我知道一些工程师强烈主张使用线程技术。“线程!线程才是进行后台处理的真正方法。cron守护进程太过时了。”
我不这么认为。
这两种方法我都用过,我认为cron具备“KeepItSimple,Stupid(KISS,简单就是美)”原则的优点。它使后台处理保持简单。不需要编写一直运行的多线程的作业处理应用程序(因此不会有内存泄漏),而是由cron启动一个简单的批处理脚本。这个脚本判断是否有作业要处理,执行作业,然后退出。不需要担心内存泄漏。也不需要担心线程停止或陷入无限循环。
那么,cron是如何工作的?这依赖于您所处的系统环境。我只讨论老式简单的cron的UNIX命令行版本,您可以向系统管理员咨询如何在自己的Web应用程序中实现它。
下面是一个简单的cron配置,它在每天晚上11点运行一个PHP脚本:
023***jack/usr/bin/php/users/home/jack/myscript.php
前5个字段定义应该启动脚本的时间。然后是应该用来运行这个脚本的用户名。其余的命令是要执行的命令行。时间字段分别是分、小时、月中的日、月和周中的日。下面是几个示例。
命令:
15****jack/usr/bin/php/users/home/jack/myscript.php
在每个小时的第15分钟运行脚本。
命令:
15,45****jack/usr/bin/php/users/home/jack/myscript.php
在每个小时的第15和第45分钟运行脚本。
命令:
*/13-23***jack/usr/bin/php/users/home/jack/myscript.php
在早上3点到晚上11点之间的每分钟运行脚本。
命令
3023**6jack/usr/bin/php/users/home/jack/myscript.php
在每星期六的晚上11:30运行脚本(星期六由6指定)。
可以看到,组合的数量是无限的。可以根据需要控制运行脚本的时间。还可以指定多个要运行的脚本,这样的话,一些脚本可以每分钟都运行,而其他脚本(比如备份脚本)可以每天只运行一次。
为了指定将报告的错误发送到哪个电子邮件地址,可以使用MAILTO指令,如下所示:
注意:对于Microsoft®Windows®用户,有一个等效的ScheduledTasks系统可以用来定期启动命令行进程(比如PHP脚本)。
回页首
批处理体系结构的基础知识
批处理是相当简单的。在大多数情况下,采用两个工作流之一。第一个工作流用于进行报告;脚本每天运行一次,它生成报告并将报告发送给一组用户。第二个工作流是在响应某种请求时创建的批作业。例如,我登录进Web应用程序中,并要求它向系统中注册的所有用户发送一个消息,将一个新的特性告诉他们。这个操作必须进行批处理,因为系统中有10,000个用户。PHP要花费一段时间才能完成这样的任务,所以它必须由浏览器之外的一个作业来执行。
在第二个工作流中,Web应用程序只需将信息放在某个位置,让批处理应用程序共享它。这些信息指定作业的性质(例如,“Sendthise-mailtoallthepeopleonthesystem”。)批处理程序运行这个作业,然后删除作业。另一种方法是,处理程序将作业标为已完成。无论用哪种方法,作业都应该识别为已完成,这样就不会再次运行它。
本文的其余部分演示在Web应用程序前端和批处理后端之间共享数据的各种方法。
回页首
邮件队列
第一种方法是使用专用的邮件队列系统。在这种模型中,数据库中的一个表包含应该发送给各个用户的电子邮件消息。Web界面使用mailouts类将电子邮件添加到队列中。电子邮件处理程序使用mailouts类检索未处理的电子邮件,然后再次使用它从队列中删除未处理的电子邮件。
这个模型首先需要MySQL模式。
清单1.mailout.sql
DROPTABLEIFEXISTSmailouts;CREATETABLEmailouts(idMEDIUMINTNOTNULLAUTO_INCREMENT,from_addressTEXTNOTNULL,to_addressTEXTNOTNULL,subjectTEXTNOTNULL,contentTEXTNOTNULL,PRIMARYKEY(id));
这个模式非常简单。每行中有一个from和一个to地址,以及电子邮件的主题和内容。
对数据库中的mailouts表进行处理的是PHPmailouts类。
清单2.mailouts.php
<?phprequire_once(DB.php);classMailouts{publicstaticfunctionget_db(){$dsn=mysql://root:@localhost/mailout;$db=&DB::Connect($dsn,array());if(PEAR::isError($db)){die($db->getMessage());}return$db;}publicstaticfunctiondelete($id){$db=Mailouts::get_db();$sth=$db->prepare(DELETEFROMmailoutsWHEREid=?);$db->execute($sth,$id);returntrue;}publicstaticfunctionadd($from,$to,$subject,$content){$db=Mailouts::get_db();$sth=$db->prepare(INSERTINTOmailoutsVALUES(null,?,?,?,?));$db->execute($sth,array($from,$to,$subject,$content));returntrue;}publicstaticfunctionget_all(){$db=Mailouts::get_db();$res=$db->query(SELECT*FROMmailouts);$rows=array();while($res->fetchInto($row)){$rows[]=$row;}return$rows;}}?>
这个脚本包含Pear::DB数据库访问类。然后定义mailouts类,其中包含三个主要的静态函数:add、delete和get_all。add()方法向队列中添加一个电子邮件,这个方法由前端使用。get_all()方法从表中返回所有数据。delete()方法删除一个电子邮件。
您可能会问,我为什么不只在脚本末尾调用delete_all()方法。不这么做有两个原因:如果在发送每个消息之后删除它,那么即使脚本在出现问题之后重新运行,消息也不可能发送两次;在批作业的启动和完成之间可能会添加新的消息。
下一步是编写一个简单的测试脚本,这个脚本将一个条目添加到队列中。
清单3.mailout_test_add.php
在这个示例中,我添加一个mailout,这个消息要发送给某公司的Molly,其中包括主题“TestSubject”和电子邮件主体。可以在命令行上运行这个脚本:phpmailout_test_add.php。
为了发送电子邮件,需要另一个脚本,这个脚本作为作业处理程序。
清单4.mailout_send.php
<?phprequire_oncemailout.php;functionprocess($from,$to,$subject,$email){mail($to,$subject,$email,From:$from);}$messages=Mailouts::get_all();foreach($messagesas$msg){process($msg[1],$msg[2],$msg[3],$msg[4]);Mailouts::delete($msg[0]);}?>
这个脚本使用get_all()方法检索所有电子邮件消息,然后使用PHP的mail()方法逐一发送消息。在每次成功发送电子邮件之后,调用delete()方法从队列中删除对应的记录。
使用cron守护进程定期运行这个脚本。运行这个脚本的频率取决于您的应用程序的需要。
注意:PHPExtensionandApplicationRepository(PEAR)存储库包含一个出色的邮件队列系统实现,可以免费下载。
回页首
更通用的方法
专门用来发送电子邮件的解决方案是很不错,但是是否有更通用的方法?我们需要能够发送电子邮件、生成报告或者执行其他耗费时间的处理,而不必在浏览器中等待处理完成。
为此,可以利用一个事实:PHP是一种解释型语言。可以将PHP代码存储在数据库中的队列中,以后再执行它。这需要两个表,见清单5。
清单5.generic.sql
DROPTABLEIFEXISTSprocessing_items;CREATETABLEprocessing_items(idMEDIUMINTNOTNULLAUTO_INCREMENT,functionTEXTNOTNULL,PRIMARYKEY(id));DROPTABLEIFEXISTSprocessing_args;CREATETABLEprocessing_args(idMEDIUMINTNOTNULLAUTO_INCREMENT,item_idMEDIUMINTNOTNULL,key_nameTEXTNOTNULL,valueTEXTNOTNULL,PRIMARYKEY(id));
第一个表processing_items包含作业处理程序调用的函数。第二个表processing_args包含要发送给函数的参数,采用的形式是由键/值对组成的hash表。
与mailouts表一样,这两个表也由PHP类包装,这个类称为ProcessingItems。
清单6.generic.php
<?phprequire_once(DB.php);classProcessingItems{publicstaticfunctionget_db(){...}publicstaticfunctiondelete($id){$db=ProcessingItems::get_db();$sth=$db->prepare(DELETEFROMprocessing_argsWHEREitem_id=?);$db->execute($sth,$id);$sth=$db->prepare(DELETEFROMprocessing_itemsWHEREid=?);$db->execute($sth,$id);returntrue;}publicstaticfunctionadd($function,$args){$db=ProcessingItems::get_db();$sth=$db->prepare(INSERTINTOprocessing_itemsVALUES(null,?));$db->execute($sth,array($function));$res=$db->query(SELECTlast_insert_id());$id=null;while($res->fetchInto($row)){$id=$row[0];}foreach($argsas$key=>$value){$sth=$db->prepare(INSERTINTOprocessing_argsVALUES(null,?,?,?));$db->execute($sth,array($id,$key,$value));}returntrue;}publicstaticfunctionget_all(){$db=ProcessingItems::get_db();$res=$db->query(SELECT*FROMprocessing_items);$rows=array();while($res->fetchInto($row)){$item=array();$item[id]=$row[0];$item[function]=$row[1];$item[args]=array();$ares=$db->query(SELECTkey_name,valueFROMprocessing_argsWHEREitem_id=?,$item[id]);while($ares->fetchInto($arow))$item[args][$arow[0]]=$arow[1];$rows[]=$item;}return$rows;}}?>
这个类包含三个重要的方法:add()、get_all()和delete()。与mailouts系统一样,前端使用add(),处理引擎使用get_all()和delete()。
清单7所示的测试脚本将一个条目添加到处理队列中。
清单7.generic_test_add.php
<?phprequire_oncegeneric.php;ProcessingItems::add(printvalue,array(value=>foo));?>
在这个示例中,添加了一个对printvalue函数的调用,并将value参数设置为foo。我使用PHP命令行解释器运行这个脚本,并将这个方法调用放进队列中。然后使用以下处理脚本运行这个方法。
清单8.generic_process.php
<?phprequire_oncegeneric.php;functionprintvalue($args){echoPrinting:.$args[value].\n;}foreach(ProcessingItems::get_all()as$item){call_user_func_array($item[function],array($item[args]));ProcessingItems::delete($item[id]);}?>
这个脚本非常简单。它获得get_all()返回的处理条目,然后使用call_user_func_array(一个PHP内部函数)用给定的参数动态地调用这个方法。在这个示例中,调用本地的printvalue函数。
为了演示这种功能,我们看看在命令行上发生了什么:
%phpgeneric_test_add.php%phpgeneric_process.phpPrinting:foo%
输出并不多,但是您能够看出要点。通过这种机制,可以将任何PHP函数的处理推迟。
现在,如果您不喜欢将PHP函数名和参数放进数据库中,那么另一种方法是在PHP代码中建立数据库中的“处理作业类型”名称和实际PHP处理函数之间的映射。按照这种方式,如果以后决定修改PHP后端,那么只要“处理作业类型”字符串匹配,系统就仍然可以工作。
回页首
放弃数据库
最后,我演示另一种稍有不同的解决方案,它使用一个目录中的文件来存储批作业,而不是使用数据库。在这里提供这个思路并不是建议您“采用这种方式,而不使用数据库”,这只是一种可供选择的方式,是否采用它由您决定。
显然,这个解决方案中没有模式,因为我们不使用数据库。所以先编写一个类,它包含与前面示例中相似的add()、get_all()和delete()方法。
清单9.batch_by_file.php
<?phpdefine(BATCH_DIRECTORY,batch_items/);classBatchFiles{publicstaticfunctiondelete($id){unlink($id);returntrue;}publicstaticfunctionadd($function,$args){$path=;while(true){$path=BATCH_DIRECTORY.time();if(file_exists($path)==false)break;}$fh=fopen($path,w);fprintf($fh,$function.\n);foreach($argsas$k=>$v){fprintf($fh,$k.:.$v.\n);}fclose($fh);returntrue;}publicstaticfunctionget_all(){$rows=array();if(is_dir(BATCH_DIRECTORY)){if($dh=opendir(BATCH_DIRECTORY)){while(($file=readdir($dh))!==false){$path=BATCH_DIRECTORY.$file;if(is_dir($path)==false){$item=array();$item[id]=$path;$fh=fopen($path,r);if($fh){$item[function]=trim(fgets($fh));$item[args]=array();while(($line=fgets($fh))!=null){$args=split(:,trim($line));$item[args][$args[0]]=$args[1];}$rows[]=$item;fclose($fh);}}}closedir($dh);}}return$rows;}}?>
BatchFiles类有三个主要方法:add()、get_all()和delete()。这个类不访问数据库,而是读写batch_items目录中的文件。
使用以下测试代码添加新的批处理条目。
清单10.batch_by_file_test_add.php
<?phprequire_oncebatch_by_file.php;BatchFiles::add(printvalue,array(value=>foo));?>
有一点需要注意:除了类名(BatchFiles)之外,实际上没有任何迹象能够说明作业是如何存储的。所以,以后很容易将它改为数据库风格的存储方式,而不需要修改接口。
最后是处理程序的代码。
清单11.batch_by_file_processor.php
<?phprequire_oncebatch_by_file.php;functionprintvalue($args){echoPrinting:.$args[value].\n;}foreach(BatchFiles::get_all()as$item){call_user_func_array($item[function],array($item[args]));BatchFiles::delete($item[id]);}?>
这段代码几乎与数据库版本完全相同,只是修改了文件名和类名。
回页首
结束语
正如前面提到的,服务器对线程提供了许多支持,可以进行后台批处理。在某些情况下,使用辅助线程处理小作业肯定比较容易。但是,也可以使用传统工具(cron、MySQL、标准的面向对象的PHP和Pear::DB)在PHP应用程序中创建批作业,这很容易实现、部署和维护。
参考资料
学习
您可以参阅本文在developerWorks全球站点上的英文原文。
阅读IBMdeveloperWorks的PHP项目资源中心,进一步了解PHP。
PEARMail_Queue包是一个健壮的邮件队列实现,其中包括数据库后端。
crontab手册提供了cron配置的细节,但是不容易理解。
PHP手册中关于UsingPHPfromthecommandline的一节可以帮助您了解如何从cron运行脚本。
随时关注developerWorks技术事件和webcast。
了解世界各地即将进行的会议、展览、网络广播和其他活动,IBM开放源码开发人员可以通过这些活动了解最新的技术发展。
访问developerWorks开源技术专区,获得广泛的how-to信息、工具和项目更新,可以帮助您利用开放源码技术进行开发并将其与IBM产品结合使用。
developerWorkspodcasts中包括很多适合于软件开发人员的有趣的访谈和讨论。
获得产品和技术
查阅PEAR--PHPExtensionandApplicationRepository,其中包含Pear::DB。
使用IBM试用软件改进您的下一个开放源码开发项目,这些软件可以下载或者通过DVD获得。
讨论
developerWorksPHPDeveloperForum为所有PHP开发人员提供了讨论技术问题的场所。如果您有关于PHP脚本、函数、语法、变量、调试和其他主题的问题,可以在这里提出。
通过参与developerWorksblog加入developerWorks社区。
关于作者
JackD.Herrington是一名高级软件工程师,具有20多年的工作经验。他撰写过三本书:CodeGenerationinAction、PodcastingHacks和PHPHacks,还撰写了30多篇文章。