Introduction
In systems with multiple task sources — Jira, Asterisk, Telegram — there's a need for centralized processing through a single worker. RQ (Redis Queue) provides an elegant multi-queue listening mechanism, which we combine with container dispatching via Podman.
Configuration Structure
All task routing is defined in config.yaml. Each queue is mapped to a specific container image and command:
# config.yaml
queues:
jira:
image: localhost/jira-processor:latest
command: [“python”, “process.py”]
podman_args: [“–pod”, “jira-stack”]
asterisk:
image: localhost/asterisk-handler:latest
command: [“python”, “handle_call.py”]
podman_args: [“–pod”, “voip-stack”]
telegram:
image: localhost/tg-worker:latest
command: [“python”, “tg_task.py”]
podman_args: []
RoundRobinWorker
The standard RQ Worker processes queues by priority — the first queue always takes precedence. For even load distribution, we use a custom RoundRobinWorker that rotates queues after processing each task, ensuring no single queue monopolizes the worker.
from rq import Worker
class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
pos = self.queues.index(reference_queue)
self.queues = self.queues[pos+1:] + self.queues[:pos+1]
The run_container_task Function
The central dispatching function takes task data and launches the corresponding container. Data is passed via stdin as JSON, and the result is read from stdout. This provides complete isolation — the container doesn’t need filesystem or network access to receive its assignment.
def run_container_task(queue_name, task_data):
config = load_config()
runner = config[‘queues’][queue_name]
cmd = [“podman”, “run”, “–rm”, “-i”]
cmd.extend(runner.get(‘podman_args’, []))
cmd.append(runner[‘image’])
cmd.extend(runner.get(‘command’, []))
result = subprocess.run(
cmd,
input=json.dumps(task_data).encode(),
capture_output=True,
timeout=300
)
return json.loads(result.stdout)
Data Passing via stdin/stdout
The choice of stdin/stdout for data exchange is deliberate. This approach ensures complete isolation: the container doesn’t need filesystem or network access to receive its assignment. JSON serialization ensures compatibility between different languages and frameworks inside containers.
Error Handling
Each stage has its own error handling mechanisms. RQ provides retries at the queue level, the container isolates failures from the main worker, and subprocess.run with a timeout prevents hangs from problems inside the container. Failed tasks are moved to the failed queue with full context for debugging.
Logging Patterns
Structured logging with task context allows tracing the entire processing path — from queue entry to container completion. We use JSON-formatted logs for easy parsing and aggregation by monitoring systems.
Conclusion
A multi-queue RQ worker with container dispatching is a powerful pattern for processing heterogeneous tasks. YAML configuration provides flexibility, Podman provides isolation, and the RoundRobinWorker ensures even load distribution across all task sources.