Stars: 219
Forks: 60
Pull Requests: 6
Issues: 11
Watchers: 9
Last Updated: 2021-02-20 15:04:45
simple multi-processes management framework based on pcntl
License: MIT License
Languages: PHP
中文README.MD
Simple Fork Framework is based on PCNTL extension, the interfaces are like Thread
and Runnable
in Java.
Writing Multi-Processes programs are hard for freshman. You must consider that how to recover zombie processes, interprocess communication, especially handle the process signal.
SimpleFork framework provide several interfaces which like Java Thread
and solutions in process pool, sync and IPC. You do not need to care about how to control multi-processes.
composer require jenner/simple_fork
Or
require '/path/to/simple-fork-php/autoload.php'
must
optional
There are two pool you can use when you have more than one process or task to manage:Pool and FixedPool.
wait
method to wait for all the sub processes exiting
(or just do something else, but do not forget to call the wait
method)reload
which can reload
all the sub processes. When you call reload
method, the master will
start new N processes and shutdown the old ones.Process::dispatchSignal
method to call
call signal handlers for pending signals.declare(ticks=n);
at the start of program
to handle the pending signals.pcntl_signal_dispatch
instead of declare
which is more is a waste of CPU resourcesn
to
a small integer, else set a big one to save the CPU time.Process::registerSignalHandler
method. start
method of the sub process is called, it will register the signal
handler automatically.More examples in [examples](https://github.com/huyanping/simple-fork-php/tree/master/examples examples) dictionary
A simple example.
class TestRunnable implements \Jenner\SimpleFork\Runnable{
/**
* Entrance
* @return mixed
*/
public function run()
{
echo "I am a sub process" . PHP_EOL;
}
}
$process = new \Jenner\SimpleFork\Process(new TestRunnable());
$process->start();
$process->wait();
A process using callback
$process = new \Jenner\SimpleFork\Process(function(){
for($i=0; $i<3; $i++){
echo $i . PHP_EOL;
sleep(1);
}
});
$process->start();
$process->wait();
Process communication using shared memory
class Producer extends \Jenner\SimpleFork\Process{
public function run(){
$cache = new \Jenner\SimpleFork\Cache\SharedMemory();
//$cache = new \Jenner\SimpleFork\Cache\RedisCache();
for($i = 0; $i<10; $i++){
$cache->set($i, $i);
echo "set {$i} : {$i}" . PHH_EOL;
}
}
}
class Worker extends \Jenner\SimpleFork\Process{
public function run(){
sleep(5);
$cache = new \Jenner\SimpleFork\Cache\SharedMemory();
//$cache = new \Jenner\SimpleFork\Cache\RedisCache();
for($i=0; $i<10; $i++){
echo "get {$i} : " . $cache->get($i) . PHP_EOL;
}
}
}
$producer = new Producer();
$worker = new Worker();
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute($producer);
$pool->execute($worker);
$pool->wait();
Process communication using system v message queue
class Producer extends \Jenner\SimpleFork\Process
{
public function run()
{
$queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue();
//$queue = new \Jenner\SimpleFork\Queue\RedisQueue();
for ($i = 0; $i < 10; $i++) {
echo getmypid() . PHP_EOL;
$queue->put($i);
}
}
}
class Worker extends \Jenner\SimpleFork\Process
{
public function run()
{
sleep(5);
$queue = new \Jenner\SimpleFork\Queue\SystemVMessageQueue();
//$queue = new \Jenner\SimpleFork\Queue\RedisQueue();
for ($i = 0; $i < 10; $i++) {
$res = $queue->get();
echo getmypid() . ' = ' . $i . PHP_EOL;
var_dump($res);
}
}
}
$producer = new Producer();
$worker = new Worker();
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute($producer);
$pool->execute($worker);
$pool->wait();
Process communication using Semaphore lock
class TestRunnable implements \Jenner\SimpleFork\Runnable
{
/**
* @var \Jenner\SimpleFork\Lock\LockInterface
*/
protected $sem;
public function __construct()
{
$this->sem = \Jenner\SimpleFork\Lock\Semaphore::create("test");
//$this->sem = \Jenner\SimpleFork\Lock\FileLock::create("/tmp/test.lock");
}
/**
* @return mixed
*/
public function run()
{
for ($i = 0; $i < 20; $i++) {
$this->sem->acquire();
echo "my turn: {$i} " . getmypid() . PHP_EOL;
$this->sem->release();
sleep(1);
}
}
}
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();
Process pool to manage processes
$pool = new \Jenner\SimpleFork\Pool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();
ParallelPool to manage processes
$fixed_pool = new \Jenner\SimpleFork\ParallelPool(new TestRunnable(), 10);
$fixed_pool->start();
$fixed_pool->keep(true);
FixedPool to manage processes
$pool = new \Jenner\SimpleFork\FixedPool(2);
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();
SinglePool to manage processes
$pool = new \Jenner\SimpleFork\SinglePool();
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->execute(new \Jenner\SimpleFork\Process(new TestRunnable()));
$pool->wait();