Adding a Queue Processor
Logidav uses a SQL-backed queue system (with optional RabbitMQ) for asynchronous task processing. This guide covers creating a new queue processor that handles queued work items.
How the Queue System Works
The central command meduse:queue:processor --action=runQueues polls for pending queue entries and dispatches each one to the appropriate processor based on the model field. The run_category field controls scheduling windows so that heavy tasks run during off-peak hours.
1. Implement QueueProcessorInterface
Create your processor class in the appropriate bundle's Services/Queue/ directory:
<?php
// src/AppBundle/Services/Queue/OrderExportProcessor.php
namespace AppBundle\Services\Queue;
use MeduseBundle\Services\Queue\QueueProcessorInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
class OrderExportProcessor implements QueueProcessorInterface
{
private $em;
private $logger;
public function __construct(EntityManagerInterface $em, LoggerInterface $logger)
{
$this->em = $em;
$this->logger = $logger;
}
/**
* Process a batch of queue entries.
*
* @param array $queues Array of Queue entities to process
* @return array Results keyed by queue ID
*/
public function execute(array $queues): array
{
$results = [];
foreach ($queues as $queue) {
try {
$payload = json_decode($queue->getPayload(), true);
$this->logger->info('Processing order export', [
'queue_id' => $queue->getId(),
'order_id' => $payload['order_id'] ?? null,
]);
// Your processing logic here
$this->exportOrder($payload);
$results[$queue->getId()] = [
'status' => 'success',
'message' => 'Order exported successfully',
];
} catch (\Exception $e) {
$this->logger->error('Order export failed', [
'queue_id' => $queue->getId(),
'error' => $e->getMessage(),
]);
$results[$queue->getId()] = [
'status' => 'error',
'message' => $e->getMessage(),
];
}
}
return $results;
}
private function exportOrder(array $payload)
{
// Business logic for exporting the order
}
}
2. Register the Processor
Add the service definition in your bundle's services.yml. The service ID is important -- it becomes the model field stored in the Queue entity:
services:
app.queue.order_export:
class: AppBundle\Services\Queue\OrderExportProcessor
arguments:
- '@doctrine.orm.entity_manager'
- '@logger'
When creating queue entries, set the model field to match this service ID:
$queue = new Queue();
$queue->setModel('app.queue.order_export');
$queue->setPayload(json_encode(['order_id' => $orderId]));
$queue->setRunCategory('standard');
$this->em->persist($queue);
$this->em->flush();
3. Queue Categories
The run_category field controls when and how queues are processed:
| Category | Description | Typical Schedule |
|---|---|---|
standard | Normal priority, runs frequently | Every 1-5 minutes |
heavy | Resource-intensive tasks | Off-peak hours |
realtime | Urgent tasks needing immediate processing | Every minute |
The queue processor command filters by category:
# Process standard queues
php bin/console meduse:queue:processor --action=runQueues --category=standard
# Process heavy queues (typically scheduled during off-peak)
php bin/console meduse:queue:processor --action=runQueues --category=heavy
4. Error Handling
Robust error handling is critical for queue processors. The system tracks retries automatically:
- Catch exceptions at the individual queue entry level so one failure does not block others.
- Log with context -- include the queue ID, relevant entity IDs, and error details.
- Update queue status -- return
errorstatus in results so the framework marks the entry correctly. - Retry tracking -- the
number_of_trialsfield on the Queue entity increments on each attempt. Check it to implement maximum retry logic:
if ($queue->getNumberOfTrials() >= 3) {
$this->logger->warning('Queue entry exceeded max retries, marking as failed', [
'queue_id' => $queue->getId(),
]);
$results[$queue->getId()] = [
'status' => 'failed',
'message' => 'Max retries exceeded',
];
continue;
}
Always wrap individual queue entry processing in try/catch. An uncaught exception in one entry must not prevent processing of the remaining entries in the batch.
5. Testing
Unit Test
Test your processor logic in isolation by mocking dependencies:
<?php
// tests/AppBundle/Services/Queue/OrderExportProcessorTest.php
namespace Tests\AppBundle\Services\Queue;
use AppBundle\Services\Queue\OrderExportProcessor;
use PHPUnit\Framework\TestCase;
class OrderExportProcessorTest extends TestCase
{
public function testExecuteProcessesQueuesSuccessfully()
{
$em = $this->createMock(\Doctrine\ORM\EntityManagerInterface::class);
$logger = $this->createMock(\Psr\Log\LoggerInterface::class);
$processor = new OrderExportProcessor($em, $logger);
$queue = $this->createMock(\MeduseBundle\Entity\Queue::class);
$queue->method('getId')->willReturn(1);
$queue->method('getPayload')->willReturn(json_encode(['order_id' => 42]));
$results = $processor->execute([$queue]);
$this->assertArrayHasKey(1, $results);
$this->assertEquals('success', $results[1]['status']);
}
}
Integration Test
Create test queue entries in the database and verify end-to-end processing:
# Insert a test queue entry, then run the processor
php bin/console meduse:queue:processor --action=runQueues -e=dev
Checklist
- Processor class implements
QueueProcessorInterface - Service registered in
services.yml - Queue entries use the correct
model(service ID) andrun_category - Error handling wraps individual entries in try/catch
- Retry logic respects
number_of_trials - Unit tests cover success and failure paths
- Crontab entry added if a new category/schedule is needed
See Also
- Architecture: Queue Model for queue internals
- Adding a Cronjob for scheduling the queue processor command
- Adding an Integration if the processor calls external APIs