Queue Model
Overview
Logidav uses a hybrid queue system combining SQL-backed persistence with RabbitMQ transport. The SQL queue (AppBundle\Entity\Queue) stores tasks durably in the database, while CoreBundle\Services\RabbitMqService provides asynchronous message passing for real-time notifications and event-driven processing.
This dual approach ensures that no task is lost on restart (SQL durability) while still supporting real-time processing when needed (RabbitMQ).
Task Lifecycle
Every queued task follows a defined state machine:
Queue Entity Fields
The AppBundle\Entity\Queue entity stores each task with these key fields:
| Field | Type | Description |
|---|---|---|
model | string | Service ID (fully-qualified) to resolve from the container |
status | string | Current state: Pending, Processing, Complete, Error |
number_of_trials | int | Number of execution attempts so far |
run_category | string | Scheduling category -- controls which processor window handles this task |
payload | text | JSON-encoded data passed to the service's execute() method |
QueueProcessorInterface
Any service that processes queued tasks must implement QueueProcessorInterface:
namespace AppBundle\Services\Queue\Data;
interface QueueProcessorInterface
{
/**
* @param array<string, mixed> $payload Decoded queue payload
*/
public function execute(array $payload): void;
}
The processor resolves the service by the model field from the container, casts it to QueueProcessorInterface, and calls execute() with the decoded payload.
When creating a new queue processor, register the service in your bundle's services.yml as public (so the container can resolve it by ID), and enqueue tasks with the service ID stored in Queue::setModel().
Queue Processor Command
The primary entry point for consuming queued tasks:
php bin/console meduse:queue:processor --action=runQueues
This command:
- Queries the
Queuetable for tasks inPendingstatus - Groups tasks by
run_category - For each task, resolves the service from the container using the
modelfield - Calls
execute()with the decoded payload - Marks the task as
Completeon success, or incrementsnumber_of_trialsand marksErroron failure
This command must run continuously in production. It is typically managed by supervisor or scheduled via cron at a high frequency. It uses LockableTrait to prevent concurrent instances.
Concurrency Prevention with LockableTrait
Both queue processors and critical cronjobs use LockableTrait to prevent overlapping execution:
use Symfony\Component\Console\Command\LockableTrait;
class MyQueueCommand extends ContainerAwareCommand
{
use LockableTrait;
protected function execute(InputInterface $input, OutputInterface $output)
{
if (!$this->lock()) {
$output->writeln('Command is already running.');
return 0;
}
try {
// process tasks...
} finally {
$this->release();
}
}
}
This is critical in production where cron may trigger a new instance before the previous one finishes.
Cronjobs vs Queues
Cronjobs and queues serve complementary roles:
| Aspect | Cronjob | Queue |
|---|---|---|
| Trigger | Time-based schedule (crontab) | Event-based (task enqueued by code) |
| Execution | Periodic, may run with nothing to do | On-demand, continuous processing |
| Typical use | Polling external systems for new data | Async processing of already-imported data |
| Concurrency | LockableTrait prevents overlap | Sequential per run_category |
| Retry | Re-runs on next cron cycle | Built-in number_of_trials counter |
| Visibility | Log files, run_job.sh output | Queue entity status in database |
The typical pattern: cronjobs import data (pull from Magento, marketplaces) and queues process the side effects (generate labels, sync stock, send notifications). See Data Flow for the complete picture.
Which Services Use Queues
For debugging failed queue tasks, see the Queue Debugging Runbook and Replay Failed Jobs.