一個用於消息隊列的並發式php進程管理程序


進程管理

用於保存進程隊列

進程啟動

關閉進程

 1 class ProcessManager
2 {
3 protected $executable = "/bin/php";
4 protected $_processes = array();
5 protected $_proc_max = 100;
6 protected $_queues = array();
7
8 public function add_process($script) {
9 if ( !isset($script["executable"]) ) {
10 $script["executable"] = $this->executable;
11 }
12 _print_log("current process num:".count($this->_processes));
13 if ( count($this->_processes) == $this->_proc_max) {
14 TimeRecorder::add($script['message_id'], IndexTag::Waiting);
15 _print_log("process is max, push in queue :".count($this->_queues));
16 array_push($this->_queues, $script);
17 } else {
18 _print_log("exec :".$script["name"]);
19 TimeRecorder::add($script['message_id'], IndexTag::StartExec);
20 $this->_processes[] = new Process($script["executable"], $script["name"], $script);
21 }
22 }
23
24 public function clear() {
25 while( count($this->_processes) > 0 ) {
26 _print_log("waiting for close processes...");
27 $this->close_processes();
28 }
29 }
30
31 public function close_processes($kill = false) {
32 // close is done process
33 foreach ($this->_processes as $i => $process) {
34 if (!$process->is_running() || $process->is_over_executed() || $kill) {
35 $error = $process->get_error();
36 _print_dividing_lines();
37 if ($error != '') {
38 _print_log($process->get_script(). " error:".$error);
39 }
40
41 if (!$process->is_running()) {
42 $result = $process->get_result();
43 _print_log("Done: ".$process->get_script(). "\nreturn:[".$result."]");
44 } else {
45 _print_log("Killed: ".$process->get_script());
46 }
47 TimeRecorder::add($process->message_id, IndexTag::EndExec);
48 $ret = $process->close();
49 TimeRecorder::show($process->message_id);
50 _print_log("exitcode: ".$ret);
51 _print_dividing_lines();
52 unset($this->_processes[$i]);
53 }
54 }
55
56 if ( count($this->_processes) < $this->_proc_max && count($this->_queues) > 1 ) {
57 _print_log("queues into processes...");
58 do {
59 $script = array_shift($this->_queues);
60 if ( is_null($script) ) {
61 break;
62 }
63 $this->add_process($script);
64 } while(count($this->_processes) < $this->_proc_max);
65 }
66 }
67
68 public function __destruct() {
69 $this->close_processes(true);
70 }
71 }

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com