Введение
В системах с несколькими источниками задач — Jira, Asterisk, Telegram — возникает потребность в централизованной обработке через единый воркер. RQ (Redis Queue) предоставляет элегантный механизм мульти-очередного прослушивания, который мы комбинируем с контейнерной диспетчеризацией через Podman.
Структура конфигурации
Вся маршрутизация задач определяется в config.yaml. Каждая очередь сопоставлена с конкретным контейнерным образом и командой:
# config.yaml
queues:
jira:
image: localhost/jira-processor:latest
command: [“python”, “process.py”]
podman_args: [“–pod”, “jira-stack”]
asterisk:
image: localhost/asterisk-handler:latest
command: [“python”, “handle_call.py”]
podman_args: [“–pod”, “voip-stack”]
telegram:
image: localhost/tg-worker:latest
command: [“python”, “tg_task.py”]
podman_args: []
RoundRobinWorker
Стандартный RQ Worker обрабатывает очереди по приоритету — первая очередь всегда имеет преимущество. Для равномерного распределения нагрузки мы используем кастомный RoundRobinWorker:
from rq import Worker
from itertools import cycle
class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue):
# Ротация очередей после каждой задачи
pos = self.queues.index(reference_queue)
self.queues = self.queues[pos+1:] + self.queues[:pos+1]
Функция run_container_task
Центральная функция диспетчеризации принимает данные задачи и запускает соответствующий контейнер. Данные передаются через stdin в формате JSON, результат читается из stdout:
import json, subprocess
def run_container_task(queue_name, task_data):
config = load_config()
runner = config[‘queues’][queue_name]
cmd = [“podman”, “run”, “–rm”, “-i”]
cmd.extend(runner.get(‘podman_args’, []))
cmd.append(runner[‘image’])
cmd.extend(runner.get(‘command’, []))
result = subprocess.run(
cmd,
input=json.dumps(task_data).encode(),
capture_output=True,
timeout=300
)
if result.returncode != 0:
raise RuntimeError(result.stderr.decode())
return json.loads(result.stdout)
Передача данных через stdin/stdout
Выбор stdin/stdout для обмена данными неслучаен. Этот подход обеспечивает полную изоляцию: контейнеру не нужен доступ к файловой системе или сети для получения задания. JSON-сериализация обеспечивает совместимость между различными языками и фреймворками внутри контейнеров.
Обработка ошибок
Каждый этап имеет свои механизмы обработки ошибок. RQ обеспечивает повторные попытки на уровне очереди, контейнер изолирует сбои от основного воркера, а subprocess.run с timeout предотвращает зависание при проблемах внутри контейнера.
Логирование
Структурированное логирование с контекстом задачи позволяет отслеживать весь путь обработки — от поступления в очередь до завершения в контейнере. Мы используем JSON-формат логов для удобства парсинга и агрегации.
Заключение
Мульти-очередной RQ-воркер с контейнерной диспетчеризацией — мощный паттерн для обработки разнородных задач. Конфигурация через YAML обеспечивает гибкость, Podman — изоляцию, а RoundRobinWorker — равномерное распределение нагрузки.