RU EN HE
Обо мне Проекты Блог
← Назад к блогу
Контейнеризация и Podman

Мульти-очередной воркер с RQ и Podman

Введение

В системах с несколькими источниками задач — Jira, Asterisk, Telegram — возникает потребность в централизованной обработке через единый воркер. RQ (Redis Queue) предоставляет элегантный механизм мульти-очередного прослушивания, который мы комбинируем с контейнерной диспетчеризацией через Podman.

Структура конфигурации

Вся маршрутизация задач определяется в config.yaml. Каждая очередь сопоставлена с конкретным контейнерным образом и командой:

# config.yaml
queues:
jira:
image: localhost/jira-processor:latest
command: [“python”, process.py]
podman_args: [“–pod”, “jira-stack”]
asterisk:
image: localhost/asterisk-handler:latest
command: [“python”, “handle_call.py”]
podman_args: [“–pod”, “voip-stack”]
telegram:
image: localhost/tg-worker:latest
command: [“python”, “tg_task.py”]
podman_args: []

RoundRobinWorker

Стандартный RQ Worker обрабатывает очереди по приоритету — первая очередь всегда имеет преимущество. Для равномерного распределения нагрузки мы используем кастомный RoundRobinWorker:

from rq import Worker
from itertools import cycle

class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): # Ротация очередей после каждой задачи pos = self.queues.index(reference_queue) self.queues = self.queues[pos+1:] + self.queues[:pos+1]

Функция run_container_task

Центральная функция диспетчеризации принимает данные задачи и запускает соответствующий контейнер. Данные передаются через stdin в формате JSON, результат читается из stdout:

import json, subprocess

def run_container_task(queue_name, task_data): config = load_config() runner = config[‘queues’][queue_name]

cmd = [“podman”, “run”, “–rm”, “-i”] cmd.extend(runner.get(‘podman_args’, [])) cmd.append(runner[‘image’]) cmd.extend(runner.get(‘command’, []))

result = subprocess.run( cmd, input=json.dumps(task_data).encode(), capture_output=True, timeout=300 )

if result.returncode != 0: raise RuntimeError(result.stderr.decode())

return json.loads(result.stdout)

Передача данных через stdin/stdout

Выбор stdin/stdout для обмена данными неслучаен. Этот подход обеспечивает полную изоляцию: контейнеру не нужен доступ к файловой системе или сети для получения задания. JSON-сериализация обеспечивает совместимость между различными языками и фреймворками внутри контейнеров.

Обработка ошибок

Каждый этап имеет свои механизмы обработки ошибок. RQ обеспечивает повторные попытки на уровне очереди, контейнер изолирует сбои от основного воркера, а subprocess.run с timeout предотвращает зависание при проблемах внутри контейнера.

Логирование

Структурированное логирование с контекстом задачи позволяет отслеживать весь путь обработки — от поступления в очередь до завершения в контейнере. Мы используем JSON-формат логов для удобства парсинга и агрегации.

Заключение

Мульти-очередной RQ-воркер с контейнерной диспетчеризацией — мощный паттерн для обработки разнородных задач. Конфигурация через YAML обеспечивает гибкость, Podman — изоляцию, а RoundRobinWorker — равномерное распределение нагрузки.