Stars: 219
Forks: 60
Pull Requests: 6
Issues: 11
Watchers: 9
Last Updated: 2021-02-20 15:04:45
simple multi-processes management framework based on pcntl
License: MIT License
Languages: PHP
中文README.MD
Simple Fork Framework is based on PCNTL extension, the interfaces are like Thread and Runnable in Java.
Writing Multi-Processes programs are hard for freshman. You must consider that how to recover zombie processes, interprocess communication, especially handle the process signal.
SimpleFork framework provide several interfaces which like Java Thread and solutions in process pool, sync and IPC. You do not need to care about how to control multi-processes.
composer require jenner/simple_forkOr
require '/path/to/simple-fork-php/autoload.php'must
optional
There are two pool you can use when you have more than one process or task to manage:Pool and FixedPool.
wait method to wait for all the sub processes exiting
(or just do something else, but do not forget to call the wait method)reload which can reload
all the sub processes. When you call reload method, the master will
start new N processes and shutdown the old ones.Process::dispatchSignal method to call
call signal handlers for pending signals.declare(ticks=n); at the start of program
to handle the pending signals.pcntl_signal_dispatch
instead of declare which is more is a waste of CPU resourcesn to
a small integer, else set a big one to save the CPU time.Process::registerSignalHandler method. start
method of the sub process is called, it will register the signal
handler automatically.More examples in [examples](https://github.com/huyanping/simple-fork-php/tree/master/examples examples) dictionary
A simple example.
class TestRunnable implements \Jenner\SimpleFork\Runnable{
/**
* Entrance
* @return mixed
*/
public function run()
{
echo "I am a sub process" . PHP_EOL;
}
}
$process = new \Jenner\SimpleFork\Process(new TestRunnable());
$process->start();
$process->wait();A process using callback
$process = new \Jenner\SimpleFork\Process(function(){
for($i=0; $i<3; $i++){
echo $i . PHP_EOL;
sleep(1);
}
});
$process->start();
$process->wait();Process communication using shared memory
class Producer extends \Jenner\SimpleFork\Process{
public function run(){
$cache = new \Jenner\SimpleFork\Cache\SharedMemory();
//$cache = new \Jenner\SimpleFork\Cache\RedisCache();
for($i = 0; $i<10; $i++){
$cache->set($i, $i);
echo "set {$i} : {$i}" . PHH_EOL;
}
}
}
class Worker extends \Jenner\SimpleFork\Process{
public function run(){
sleep(5);
$cache = new \Jenner\SimpleFork\Cache\SharedMemory();
//$cache = new \Jenner\SimpleFork\Cache\RedisCache();
for($i=0; $i<10; $i++){
echo "get {$i} : " . $cache->get($i) . PHP_EOL;
}
}
}
$producer = new Producer();
$worker = new Worker();
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute($producer);
$pool->execute($worker);
$pool->wait();Process communication using system v message queue
class Producer extends \Jenner\SimpleFork\Process
{
public function run()
{
$queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue();
//$queue = new \Jenner\SimpleFork\Queue\RedisQueue();
for ($i = 0; $i < 10; $i++) {
echo getmypid() . PHP_EOL;
$queue->put($i);
}
}
}
class Worker extends \Jenner\SimpleFork\Process
{
public function run()
{
sleep(5);
$queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue();
//$queue = new \Jenner\SimpleFork\Queue\RedisQueue();
for ($i = 0; $i < 10; $i++) {
$res = $queue->get();
echo getmypid() . ' = ' . $i . PHP_EOL;
var_dump($res);
}
}
}
$producer = new Producer();
$worker = new Worker();
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute($producer);
$pool->execute($worker);
$pool->wait();Process communication using Semaphore lock
class TestRunnable implements \Jenner\SimpleFork\Runnable
{
/**
* @var \Jenner\SimpleFork\Lock\LockInterface
*/
protected $sem;
public function __construct()
{
$this->sem = \Jenner\SimpleFork\Lock\Semaphore::create("test");
//$this->sem = \Jenner\SimpleFork\Lock\FileLock::create("/tmp/test.lock");
}
/**
* @return mixed
*/
public function run()
{
for ($i = 0; $i < 20; $i++) {
$this->sem->acquire();
echo "my turn: {$i} " . getmypid() . PHP_EOL;
$this->sem->release();
sleep(1);
}
}
}
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();Process pool to manage processes
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();ParallelPool to manage processes
$fixed_pool = new \Jenner\SimpleFork\ParallelPool(new TestRunnable(), 10);
$fixed_pool->start();
$fixed_pool->keep(true);FixedPool to manage processes
$pool = new \Jenner\SimpleFork\FixedPool(2);
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();SinglePool to manage processes
$pool = new \Jenner\SimpleFork\SinglePool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();