Skip to main content

Queue Model

Overview

Logidav uses a hybrid queue system combining SQL-backed persistence with RabbitMQ transport. The SQL queue (AppBundle\Entity\Queue) stores tasks durably in the database, while CoreBundle\Services\RabbitMqService provides asynchronous message passing for real-time notifications and event-driven processing.

This dual approach ensures that no task is lost on restart (SQL durability) while still supporting real-time processing when needed (RabbitMQ).

Task Lifecycle

Every queued task follows a defined state machine:

Queue Entity Fields

The AppBundle\Entity\Queue entity stores each task with these key fields:

FieldTypeDescription
modelstringService ID (fully-qualified) to resolve from the container
statusstringCurrent state: Pending, Processing, Complete, Error
number_of_trialsintNumber of execution attempts so far
run_categorystringScheduling category -- controls which processor window handles this task
payloadtextJSON-encoded data passed to the service's execute() method

QueueProcessorInterface

Any service that processes queued tasks must implement QueueProcessorInterface:

namespace AppBundle\Services\Queue\Data;

interface QueueProcessorInterface
{
/**
* @param array<string, mixed> $payload Decoded queue payload
*/
public function execute(array $payload): void;
}

The processor resolves the service by the model field from the container, casts it to QueueProcessorInterface, and calls execute() with the decoded payload.

tip

When creating a new queue processor, register the service in your bundle's services.yml as public (so the container can resolve it by ID), and enqueue tasks with the service ID stored in Queue::setModel().

Queue Processor Command

The primary entry point for consuming queued tasks:

php bin/console meduse:queue:processor --action=runQueues

This command:

  1. Queries the Queue table for tasks in Pending status
  2. Groups tasks by run_category
  3. For each task, resolves the service from the container using the model field
  4. Calls execute() with the decoded payload
  5. Marks the task as Complete on success, or increments number_of_trials and marks Error on failure
warning

This command must run continuously in production. It is typically managed by supervisor or scheduled via cron at a high frequency. It uses LockableTrait to prevent concurrent instances.

Concurrency Prevention with LockableTrait

Both queue processors and critical cronjobs use LockableTrait to prevent overlapping execution:

use Symfony\Component\Console\Command\LockableTrait;

class MyQueueCommand extends ContainerAwareCommand
{
use LockableTrait;

protected function execute(InputInterface $input, OutputInterface $output)
{
if (!$this->lock()) {
$output->writeln('Command is already running.');
return 0;
}

try {
// process tasks...
} finally {
$this->release();
}
}
}

This is critical in production where cron may trigger a new instance before the previous one finishes.

Cronjobs vs Queues

Cronjobs and queues serve complementary roles:

AspectCronjobQueue
TriggerTime-based schedule (crontab)Event-based (task enqueued by code)
ExecutionPeriodic, may run with nothing to doOn-demand, continuous processing
Typical usePolling external systems for new dataAsync processing of already-imported data
ConcurrencyLockableTrait prevents overlapSequential per run_category
RetryRe-runs on next cron cycleBuilt-in number_of_trials counter
VisibilityLog files, run_job.sh outputQueue entity status in database
info

The typical pattern: cronjobs import data (pull from Magento, marketplaces) and queues process the side effects (generate labels, sync stock, send notifications). See Data Flow for the complete picture.

Which Services Use Queues

For debugging failed queue tasks, see the Queue Debugging Runbook and Replay Failed Jobs.