Skip to main content

Adding a Queue Processor

Logidav uses a SQL-backed queue system (with optional RabbitMQ) for asynchronous task processing. This guide covers creating a new queue processor that handles queued work items.

How the Queue System Works

The central command meduse:queue:processor --action=runQueues polls for pending queue entries and dispatches each one to the appropriate processor based on the model field. The run_category field controls scheduling windows so that heavy tasks run during off-peak hours.

1. Implement QueueProcessorInterface

Create your processor class in the appropriate bundle's Services/Queue/ directory:

<?php
// src/AppBundle/Services/Queue/OrderExportProcessor.php

namespace AppBundle\Services\Queue;

use MeduseBundle\Services\Queue\QueueProcessorInterface;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;

class OrderExportProcessor implements QueueProcessorInterface
{
private $em;
private $logger;

public function __construct(EntityManagerInterface $em, LoggerInterface $logger)
{
$this->em = $em;
$this->logger = $logger;
}

/**
* Process a batch of queue entries.
*
* @param array $queues Array of Queue entities to process
* @return array Results keyed by queue ID
*/
public function execute(array $queues): array
{
$results = [];

foreach ($queues as $queue) {
try {
$payload = json_decode($queue->getPayload(), true);

$this->logger->info('Processing order export', [
'queue_id' => $queue->getId(),
'order_id' => $payload['order_id'] ?? null,
]);

// Your processing logic here
$this->exportOrder($payload);

$results[$queue->getId()] = [
'status' => 'success',
'message' => 'Order exported successfully',
];
} catch (\Exception $e) {
$this->logger->error('Order export failed', [
'queue_id' => $queue->getId(),
'error' => $e->getMessage(),
]);

$results[$queue->getId()] = [
'status' => 'error',
'message' => $e->getMessage(),
];
}
}

return $results;
}

private function exportOrder(array $payload)
{
// Business logic for exporting the order
}
}

2. Register the Processor

Add the service definition in your bundle's services.yml. The service ID is important -- it becomes the model field stored in the Queue entity:

services:
app.queue.order_export:
class: AppBundle\Services\Queue\OrderExportProcessor
arguments:
- '@doctrine.orm.entity_manager'
- '@logger'

When creating queue entries, set the model field to match this service ID:

$queue = new Queue();
$queue->setModel('app.queue.order_export');
$queue->setPayload(json_encode(['order_id' => $orderId]));
$queue->setRunCategory('standard');
$this->em->persist($queue);
$this->em->flush();

3. Queue Categories

The run_category field controls when and how queues are processed:

CategoryDescriptionTypical Schedule
standardNormal priority, runs frequentlyEvery 1-5 minutes
heavyResource-intensive tasksOff-peak hours
realtimeUrgent tasks needing immediate processingEvery minute

The queue processor command filters by category:

# Process standard queues
php bin/console meduse:queue:processor --action=runQueues --category=standard

# Process heavy queues (typically scheduled during off-peak)
php bin/console meduse:queue:processor --action=runQueues --category=heavy

4. Error Handling

Robust error handling is critical for queue processors. The system tracks retries automatically:

  • Catch exceptions at the individual queue entry level so one failure does not block others.
  • Log with context -- include the queue ID, relevant entity IDs, and error details.
  • Update queue status -- return error status in results so the framework marks the entry correctly.
  • Retry tracking -- the number_of_trials field on the Queue entity increments on each attempt. Check it to implement maximum retry logic:
if ($queue->getNumberOfTrials() >= 3) {
$this->logger->warning('Queue entry exceeded max retries, marking as failed', [
'queue_id' => $queue->getId(),
]);
$results[$queue->getId()] = [
'status' => 'failed',
'message' => 'Max retries exceeded',
];
continue;
}
warning

Always wrap individual queue entry processing in try/catch. An uncaught exception in one entry must not prevent processing of the remaining entries in the batch.

5. Testing

Unit Test

Test your processor logic in isolation by mocking dependencies:

<?php
// tests/AppBundle/Services/Queue/OrderExportProcessorTest.php

namespace Tests\AppBundle\Services\Queue;

use AppBundle\Services\Queue\OrderExportProcessor;
use PHPUnit\Framework\TestCase;

class OrderExportProcessorTest extends TestCase
{
public function testExecuteProcessesQueuesSuccessfully()
{
$em = $this->createMock(\Doctrine\ORM\EntityManagerInterface::class);
$logger = $this->createMock(\Psr\Log\LoggerInterface::class);

$processor = new OrderExportProcessor($em, $logger);

$queue = $this->createMock(\MeduseBundle\Entity\Queue::class);
$queue->method('getId')->willReturn(1);
$queue->method('getPayload')->willReturn(json_encode(['order_id' => 42]));

$results = $processor->execute([$queue]);

$this->assertArrayHasKey(1, $results);
$this->assertEquals('success', $results[1]['status']);
}
}

Integration Test

Create test queue entries in the database and verify end-to-end processing:

# Insert a test queue entry, then run the processor
php bin/console meduse:queue:processor --action=runQueues -e=dev

Checklist

  • Processor class implements QueueProcessorInterface
  • Service registered in services.yml
  • Queue entries use the correct model (service ID) and run_category
  • Error handling wraps individual entries in try/catch
  • Retry logic respects number_of_trials
  • Unit tests cover success and failure paths
  • Crontab entry added if a new category/schedule is needed

See Also