Stars: 119
Forks: 38
Pull Requests: 73
Issues: 84
Watchers: 9
Last Updated: 2023-06-21 16:08:36
Symfony2/3/4/5 Queue Bundle (for background jobs) supporting Mongo (Doctrine ODM), Mysql (and any Doctrine ORM), RabbitMQ, Beanstalkd, Redis, and ... {write your own}
License: MIT License
Languages: PHP, JavaScript, Dockerfile, Twig
Allow symfony developers to create background job as easily as:
$worker->later()->process(1,2,3)
See changes
Upgrading from 5.0: see UPGRADING-6.0.md
This bundle provides a way to easily create and manage queued background jobs
Basic Features:
Job-specific Features:
see /Resources/doc/symfony2-3.md
see /Resources/doc/symfony4-5.md
see /Resources/doc/troubleshooting.md
Create a worker class that will work on the background job.
Example:
<?php
namespace App\Worker; // for symfony 2/3, the namespace would typically be AppBundle\Worker
class Fibonacci
extends \Dtc\QueueBundle\Model\Worker
{
private $filename;
public function __construct() {
$this->filename = '/tmp/fib-result.txt';
}
public function fibonacciFile($n) {
$fib = $this->fibonacci($n);
file_put_contents($this->filename, "{$n}: {$fib}");
}
public function fibonacci($n)
{
if($n == 0)
return 0; //F0
elseif ($n == 1)
return 1; //F1
else
return $this->fibonacci($n - 1) + $this->fibonacci($n - 2);
}
public function getName() {
return 'fibonacci';
}
public function getFilename()
{
return $this->filename;
}
}
Create a DI service for the job, and tag it as a background worker.
Symfony 5, 4 and 3.3, 3.4:
services:
# for symfony 3 the class name would likely be AppBundle\Worker\FibonacciWorker
App\Worker\Fibonacci:
# public: false is possible if you completely use DependencyInjection for access to the service
public: true
tags:
- { name: "dtc_queue.worker" }
Symfony 2, and 3.0, 3.1, 3.2:
services:
app.worker.fibonacci:
class: AppBundle\Worker\Fibonacci:
tags:
- { name: "dtc_queue.worker" }
<services>
<!-- ... -->
<service id="fibonacci" class="Fibonacci">
<tag name="dtc_queue.worker" />
</service>
<!-- ... -->
</services>
Simple examples:
bin/console dtc:queue:create_job <worker> <method> <arg>
bin/console dtc:queue:create_job fibonacci fibonacci 3
bin/console dtc:queue:create_job fibonacci fibonacciFile 8
// Dependency inject the worker or fetch it from the container
$fibonacci = $container->get('App\Worker\Fibonacci');
// For Symfony 3.3, 3.4
// $fibonacci = $container->get('AppBundle\Worker\Fibonacci');
//
// For Symfony 2, 3.0, 3.1, 3.2:
// $fibonacci = $container->get('app.worker.fibonacci');
// Basic Examples
$fibonacci->later()->fibonacci(20);
$fibonacci->later()->fibonacciFile(20);
// Batch Example
$fibonacci->batchLater()->fibonacci(20); // Batch up runs into a single run
// Timed Example
$fibonacci->later(90)->fibonacci(20); // Run 90 seconds later
// Priority
// Note: whether 1 == High or Low priority is configurable, but by default it is High
$fibonacci->later(0, 1); // As soon as possible, High priority
$fibonacci->later(0, 125); // Medium priority
$fibonacci->later(0, 255); // Low priority
// Advanced Usage Example:
// (If the job is not processed by $expireTime, then don't execute it ever...)
$expireTime = time() + 3600;
$fibonacci->later()->setExpiresAt(new \DateTime("@$expireTime"))->fibonacci(20); // Must be run within the hour or not at all
For further instructions on creating jobs, including how to create a job from the command line using json-encoded arguments, see:
It's recommended that you background the following console commands
bin/console dtc:queue:run -d 120
# the -d parameter is the number of seconds during which to keep executing jobs before ending.
# For example you could put the above command into cron or a cron-like system to run every 2 minutes
#
# There are a number of other parameters that could be passed to dtc:queue:run run this for a full list:
bin/console dtc:queue:run --help
For ODM and ORM based stores, the archive tables and the regular job table (queue) can require periodic pruning.
The regular job table is for waiting and running jobs. If a job throws an exception that can't be caught or the process segfaults, machine crashes, etc. then jobs which never finished can remain in the job queue in the "Running" state.
The archive table is where finished and errored jobs end up after execution; this table can grow indefinitely.
For Mongo in production, it may be prudent to use a capped collection or TTL Indexes
For Mysql you could create an event to delete data periodically.
Nevertheless there are also several commands that exist that do similarly (and could be put into a periodic cron job as well):
bin/console dtc:queue:prune old --older 1m
# (deletes jobs older than one month from the Archive table)
# Clear out stalled jobs from the regular job table:
bin/console dtc:queue:prune stalled
# If you're recording runs...this is recommended:
bin/console dtc:queue:prune stalled_runs
# If you're recording runs...another recommendation
bin/console dtc:queue:prune old_runs --older 1m
# If you're recording timings
bin/console dtc:queue:prune old_job_timings --older 1m
# You can tune 1m to a smaller interval such as 10d (10 days) or even 1800s (1/2 hour)
# if you have too many jobs flowing through the system.
bin/console dtc:queue:prune --help # lists other prune commands
These commands may help with debugging issues with the queue:
bin/console dtc:queue:count # some status about the queue if available (ODM/ORM only)
bin/console dtc:queue:reset # resets errored and/or stalled jobs
# This is really only good for ORM/ODM based stores.
bin/console dtc:queue:run --id={jobId}
# (jobId could be obtained from mongodb / or your database, if using an ORM / ODM solution)
Each job run can be tracked in a table in an ORM / ODM backed datastore.
Ways to configure: app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)
dtc_queue:
manager:
# run defaults to whatever job is set to (which defaults to "odm", i.e. mongodb)
# If you set the job to rabbit_mq, or beanstalkd or something else, you need to set run
# to an ORM / ODM run_manager (or a custom such one) in order to get the runs to save
#
run: orm # other possible option is "odm" (i.e. mongodb)
#
# (optionally define your own run manager with id: dtc_queue.manager.run.{some_name} and put {some_name} under run:
Change the document manager
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)
dtc_queue:
odm:
document_manager: {something} # default is "default"
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)
dtc_queue:
manager:
job: orm
Change the EntityManager:
dtc_queue:
orm:
entity_manager: {something} # default is "default"
NOTE: You may need to add DtcQueueBundle to your mappings section in config.yml if auto_mapping is not enabled
doctrine:
#...
orm:
#...
mappings:
DtcQueueBundle: ~
If you plan on using ODM or Redis or another configuration, but you have Doctrine ORM enabled elsewhere, it's recommended that you use the schema_filter configuration parameter so that schema dumps and/or migration diffs don't pickup those tables (see issue #77).
E.g.
doctrine:
# ...
dbal:
# ...
schema_filter: ~^(?!dtc_)~
(if you already have a schema_filter, you can just add the "dtc_" prefix to it.)
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)
dtc_queue:
beanstalkd:
host: beanstalkd
tube: some-tube-name [optional]
manager:
job: beanstalkd
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)
dtc_queue:
manager:
job: rabbit_mq
rabbit_mq:
host: rabbitmq
port: 5672
user: guest
password: guest
vhost: "/" [optional defaults to "/"]
ssl: [optional defaults to false - toggles to use AMQPSSLConnection]
options: [optional options to pass to AMQPStreamConnection or AMQPSSLConnection]
ssl_options: [optional extra ssl options to pass to AMQPSSLConnection]
queue_args: [optional]
queue: [optional queue name]
passive: [optional defaults to false]
durable: [optional defaults to true]
exlusive: [optional defaults to false]
auto_delete: [optional defaults to false]
exchange_args: [optional]
exchange: [optional queue name]
type: [optional defaults to "direct"]
passive: [optional defaults to false]
durable: [optional defaults to true]
auto_delete: [optional defaults to false]
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)
dtc_queue:
manager:
job: redis
redis:
# choose one of the below snc_redis, predis, or phpredis
snc_redis:
type: predis
alias: default
predis:
# choose one of dns or connection_parameters
dsn: redis://localhost
connection_parameters:
scheme: tcp
host: localhost
port: 6379
path: ~
database: ~
password: ~
async: false
persistent: false
timeout: 5.0
read_write_timeout: ~
alias: ~
weight: ~
iterable_multibulk: false
throw_errors: true
phpredis:
# minimum fill host and port if needed
host: localhost
port: 6379
timeout: 0
retry_interval: ~
read_timeout: 0
auth: ~
app/config/config.yml: (symfony 2/3) config/packages/dtc_queue.yaml: (symfony 4/5)
dtc_queue:
class_job: Some\Job\ClassName [optional]
manager:
job: some_name [optional]
# (create your own manager service and name or alias it:
# dtc_queue.manager.job.<some_name> and put
# <some_name> in the manager: job field above)
Dtc\QueueBundle\Document\Job
Dtc\QueueBundle\Document\JobArchive
or
Dtc\QueueBundle\Entity\Job
Dtc\QueueBundle\Entity\JobArchive
(Depending on whether you're using MongoDB or an ORM)
<?php
namespace App\Entity; // Or whatever
use Dtc\QueueBundle\Entity\Job as BaseJob;
use Doctrine\ORM\Mapping as ORM;
/**
* @ORM\Entity
* @ORM\Table(name="job_some_other_name", indexes={@ORM\Index(name="job_crc_hash_idx", columns={"crcHash","status"}),
* @ORM\Index(name="job_priority_idx", columns={"priority","whenAt"}),
* @ORM\Index(name="job_when_idx", columns={"whenAt"}),
* @ORM\Index(name="job_status_idx", columns={"status","whenAt"})})
*/
class Job extends BaseJob {
}
// ... similarly for Entity\JobArchive if necessary
<?php
namespace App\Document;
use Doctrine\ODM\MongoDB\Mapping\Annotations as ODM;
use Dtc\QueueBundle\Document\Job as BaseJob;
/**
* @ODM\Document(db="my_db", collection="my_job_collection")
*/
class Job extends BaseJob
{
}
// ... similarly for Document\JobArchive if necessary
# config.yml
# ...
dtc_queue:
class_job: App\Entity\Job
class_job_archive: App\Entity\JobArchive
It's useful to listen to event in a long running script to clear doctrine manager or send email about status of a job. To add a job event subscriber, create a new service with tag: dtc_queue.event_subscriber:
services:
voices.queue.listener.clear_manager:
class: ClearManagerSubscriber
arguments:
- '@service_container'
tags:
- { name: dtc_queue.event_subscriber, connection: default }
ClearManagerSubscriber.php
<?php
use Dtc\QueueBundle\EventDispatcher\Event;
use Dtc\QueueBundle\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
class ClearManagerSubscriber
implements EventSubscriberInterface
{
private $container;
public function __construct(ContainerInterface $container) {
$this->container = $container;
}
public function onPostJob(Event $event)
{
$managerIds = [
'doctrine.odm.mongodb.document_manager',
'doctrine.orm.default_entity_manager',
'doctrine.orm.content_entity_manager'
];
foreach ($managerIds as $id) {
$manager = $this->container->get($id);
$manager->clear();
}
}
public static function getSubscribedEvents()
{
return array(
Event::POST_JOB => 'onPostJob',
);
}
}
# /etc/init/queue.conf
author "David Tee"
description "Queue worker service, run 20 jobs at a time, process timeout of 3600"
respawn
start on startup
script
/{path to}/console dtc:queue:run --max-count 20 -v -t 3600>> /var/logs/queue.log 2>&1
end script
NOTE: ORM And ODM (MongoDB) require mmucklo/grid-bundle in order to view the jobs/runs admin page.
You can register admin routes to see queue status. In your routing.yml file, add the following:
dtc_queue:
resource: '@DtcQueueBundle/Resources/config/routing.yml'
You can run unittest by typing bin/phpunit
in source folder. If you want to run
integration testing with Mongodb, you need to set up Mongodb server on
localhost and run:
bin/phpunit Tests/Document/JobManagerTest.php
If you want to run Beanstalkd integration testing, you need to run a local, empty instance of beanstalkd for testing.
sudo service beanstalkd restart; BEANSTALKD_HOST=localhost bin/phpunit Tests/BeanStalkd/JobManagerTest.php
See /Resources/doc/full-configuration.md
This bundle is under the MIT license.
Originally written by @dtee Enhanced and maintained by @mmucklo