Redis — это современная высокопроизводительная система управления базами данных, которая полностью работает в оперативной памяти и часто используется как сервер структур данных. Он поддерживает не только стандартные типы, вроде строк, но и такие структуры, как списки, множества, хэши и потоки.
Благодаря этому Redis применяют для кэширования, организации сессий, хранения лидеров в игровых таблицах, а также для построения очередей сообщений и фоновых задач.
В работе с веб-приложениями, микросервисами и IoT-решениями часто возникает задача — быстро и надежно организовать передачу задач между разными компонентами системы. Redis отлично подходит для таких случаев благодаря:
- мгновенному доступу к данным и операциям в реальном времени;
- поддержке разнообразных структур для разных сценариев работы с очередями;
- наличию встроенной авторизации;
- возможности кластеризации, которая позволяет использовать Redis в распределенных системах;
- простому масштабированию и высокой отказоустойчивости;
- большому сообществу и наличию готовых библиотек для популярных языков программирования.
В этой статье мы подробно разберем, как подготовить окружение для работы с Redis и реализовать очереди задач разными способами.

Способы для настройки очередей задач в Redis
Redis не предоставляет встроенного механизма «очередей задач» в привычном смысле, как, например, RabbitMQ или Kafka. Но благодаря своей универсальности и скорости позволяет реализовать очередь вручную — разными способами и под разные задачи:
- Pub/Sub (публикация-подписка) — классическая модель обмена сообщениями, при которой один или несколько отправителей публикуют данные в канал, а все подписчики, подключенные в этот момент, немедленно их получают. Сообщения не сохраняются: если в момент публикации никто не подписан на канал, данные теряются без возможности восстановления.
- List (список) — это простая и популярная структура данных, которую можно использовать для реализации очереди по принципу FIFO: первым вошел — первым вышел. Сообщения в такой очереди хранятся в Redis до тех пор, пока не будут обработаны. Благодаря этому задачи не теряются даже в случае, если обработчик временно недоступен, — они просто ждут своей очереди, чтобы быть извлеченными и выполненными.
- Stream (потоки) — современный и продвинутый способ, которые позволяет работать с большими объемами событий, хранить историю сообщений, управлять группами потребителей и отслеживать, какие сообщения уже были обработаны.
Технологии очередей позволяют создавать real-time сервисы, чаты, системы аналитики. В Рег.облаке вы можете быстро поднять окружение и протестировать такие сценарии без лишних затрат.
Как реализовать и настроить очередь в Redis
Разберемся, как на практике организовать передачу и обработку задач с помощью различных инструментов Redis:
Способ 1. На основе Pub/Sub
Один из простых и удобных способов организовать обмен сообщениями между разными частями приложения — использовать механизм публикации и подписки (Pub/Sub) в Redis. Он подходит для сценариев, где важна доставка событий в моменте, например, для уведомлений, передачи сигналов между сервисами или push-сообщений в чатах.
В рабочем каталоге создайте файл:
|
1 |
sudo nano subscriber_pubsub.py |
Этот скрипт будет получать сообщения из определенных каналов Redis:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import redis import time # Подключение к Redis — подставьте свои параметры redis_conn = redis.Redis( host="REDIS_HOST", # IP-адрес или имя хоста Redis password="REDIS_PASSWORD", # пароль пользователя Redis port=6379, # стандартный порт Redis (без SSL) db=0, decode_responses=True ) # Создаем подписчика Pub/Sub subscriber = redis_conn.pubsub() subscriber.subscribe("news", "alerts") # Подписка сразу на два канала # Цикл ожидания и обработки новых сообщений while True: time.sleep(0.01) message = subscriber.get_message() if message: # Пропускаем служебные сообщения от Redis if not isinstance(message["data"], int): print("Новое сообщение:", message["data"]) |
Здесь обработчик подключается к Redis и подписывается на два канала — news и alerts. В бесконечном цикле скрипт проверяет, не появилось ли новое сообщение, и, если оно есть, выводит его в консоль.
Теперь создадим второй файл — отправитель, который будет публиковать сообщения в выбранные каналы. Назовем его, например, publisher_pubsub.py:
|
1 |
nano publisher_pubsub.py |
Скрипт отправителя выглядит так:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import redis # Соединение с Redis — те же параметры, что и у подписчика r = redis.Redis( host="REDIS_HOST", port=6379, password="REDIS_SECRET", db=0, decode_responses=True ) # Публикуем сообщения в разные каналы r.publish('updates', 'Сервис обновлен и готов к работе') r.publish('alerts', 'Внимание: обнаружена новая задача!') |
Здесь все просто — скрипт подключается к Redis и отправляет одно сообщение в канал updates, другое — в канал alerts.
Сначала запустите подписчика (слушателя каналов) в одном терминале:
|
1 |
python subscriber_pubsub.py |
Затем — в другом терминале:
|
1 2 |
source ./venv/bin/activate python publisher_pubsub.py |
В первом терминале увидите вывод примерно такого вида:
[updates]: Сервис обновлен и готов к работе
[alerts]: Внимание: обнаружена новая задача!

Способ 2. На основе List
Еще один вариант реализации очереди в Redis — использовать тип данных List. Его часто применяют для классических сценариев: элемент добавляется в очередь, обрабатывается и затем удаляется.
Задачи хранятся в обычном Redis‑списке и обрабатывается по принципу FIFO — первым обрабатывается то, что было добавлено раньше. В этой схеме участвуют два основных компонента: издатель (publisher) и подписчик (subscriber). Издатель отвечает за постановку задач в очередь — он формирует сообщение и добавляет его в список. Подписчик, в свою очередь, следит за очередью, извлекает новые задачи и выполняет их.
Сначала реализуйте подписчика (subscriber_list.py), который будет забирать задачи из списка task_queue и обрабатывать их:
|
1 |
sudo nano subscriber_list.py |
Скрипт:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import os import redis import time client = redis.Redis( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=int(os.getenv("REDIS_PORT", 6379)), password=os.getenv("REDIS_PASSWORD", None), db=0, decode_responses=True, ) print("Ожидание задач…") # работаем, пока в списке остается хотя бы один элемент while client.llen("task_queue"): task = client.rpop("task_queue") if task: print(f" обработано: {task}") time.sleep(0.05) |
Здесь используется метод rpop, который извлекает элементы из конца списка, обеспечивая тем самым правильный порядок обработки задач — первым обработается то сообщение, которое первым было добавлено. Между обработкой задач добавлена небольшая задержка, чтобы снизить нагрузку на сервер.
Теперь создадим издателя (publisher_list.py), который формирует и помещает задачи в очередь:
|
1 |
sudo nano publisher_list.py |
Пропишите в нем:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import os import random import redis client = redis.Redis( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=int(os.getenv("REDIS_PORT", 6379)), password=os.getenv("REDIS_PASSWORD", None), db=0, decode_responses=True, ) # помещаем пять задач в список task_queue for i in range(5): job_id = random.randint(1_000, 9_999) message = f"Task-{job_id}" client.lpush("task_queue", message) print(f"→ добавлено: {message}") |
Сообщения помещаются в начало списка с помощью lpush. Для наглядности каждому сообщению присваивается случайный номер задачи, чтобы можно было увидеть их порядок обработки.
Запустите подписчика:
|
1 2 |
source ./venv/bin/activate python subscriber_list.py |
В новом окне терминала запустите издателя:
|
1 2 |
source ./venv/bin/activate python publisher_list.py |
На стороне подписчика вы увидите примерно такой вывод:
Ожидание задач…
обработано: Task-7421
обработано: Task-3914
обработано: Task-5842
обработано: Task-1580
обработано: Task-9037

Способ 3. На основе Stream
Streams — тип структуры данных, который представляет собой упорядоченную последовательность записей (сообщений). Каждая из них автоматически получает уникальный идентификатор (ID). Их часто называют «потоками» или «логами событий».
В отличие от Pub/Sub, где сообщения не сохраняются для поздних подписчиков, и List, где нужно вручную следить за удалением, Stream предоставляет встроенные инструменты:
- XADD для добавления новых записей в конец потока;
- XLEN для получения длины потока;
- XREAD для чтения записей начиная с заданного ID (с возможностью блокировать до появления новых данных);
- XRANGE и XREVRANGE для выборки диапазона записей по ID.
Сначала создайте скрипт-издатель, который будет добавлять записи в поток task_stream:
|
1 |
sudo nano publisher_stream.py |
Непосредственно скрипт:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import os import random import redis # Настраиваем подключение к Redis из переменных окружения client = redis.Redis( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=int(os.getenv("REDIS_PORT", 6379)), password=os.getenv("REDIS_PASSWORD", None), db=0, decode_responses=True, ) # Отправляем пять сообщений в поток task_stream for _ in range(5): payload = {"task_id": str(random.randint(1000, 9999)), "action": "process_data"} # XADD добавляет запись с полем payload и уникальным ID message_id = client.xadd("task_stream", payload) print(f"→ отправлено в поток {message_id}: {payload}") # Показываем, сколько записей накопилось в потоке length = client.xlen("task_stream") print(f"Текущая длина stream: {length}") |
Каждая запись состоит из пары полей: уникального task_id и строки action. После отправки всех задач вы получите длину потока через XLEN, чтобы убедиться, что сообщения дошли.
Далее напишите простой скрипт-подписчик subscriber_stream.py, который при каждом запуске читает все содержимое потока:
|
1 |
sudo nano subscriber_stream.py |
Скрипт:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import os import redis client = redis.Redis( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=int(os.getenv("REDIS_PORT", 6379)), password=os.getenv("REDIS_PASSWORD", None), db=0, decode_responses=True, ) # Узнаем, сколько сообщений в потоке total = client.xlen("task_stream") if total > 0: # XREAD считывает записи начиная с начала (ID "0") и возвращает список entries = client.xread(streams={"task_stream": "0"}, count=total) # Печатаем весь список записей, включая их ID и данные print(entries) |
При первом запуске вы получите список всех сообщений, и каждый элемент будет представлен в виде [stream_name, [(message_id, {…}), …]]. Но при повторном запуске вам придется снова прочитать те же записи, что не всегда удобно при обработке уникальных событий.
Чтобы читать только новые записи, сохраните ID последней обработанной записи в отдельном ключе Redis last_id и используйте его при последующих вызовах XREAD:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
import os import redis client = redis.Redis( host=os.getenv("REDIS_HOST", "127.0.0.1"), port=int(os.getenv("REDIS_PORT", 6379)), password=os.getenv("REDIS_PASSWORD", None), db=0, decode_responses=True, ) # Инициализируем ключ last_id, если он еще не задан if not client.exists("last_id"): client.set("last_id", "0") last_id = client.get("last_id") # Считываем только новые сообщения: блокируем до 1000 мс, чтобы ждать появления данных results = client.xread(streams={"task_stream": last_id}, block=1000) if results: # results — список вида [(stream_name, [(id1, data1), (id2, data2), …])] for stream_name, messages in results: for msg_id, fields in messages: print(f"[{msg_id}] получено: {fields}") # Обновляем last_id на последний обработанный ID client.set("last_id", msg_id) |
Теперь при первом старте скрипт прочитает все существующие записи, а при следующих запусках — только те, что появились после last_id.
Заключение
В этой статье мы рассмотрели три основных способа организации очередей задач в Redis: Pub/Sub, List и Stream. Приведенные примеры показывают базовые принципы работы с очередями, но в реальных проектах обычно нужно дорабатывать эти решения — добавлять обработку ошибок, контролировать повторную обработку сообщений, обеспечивать устойчивость к сбоям и внедрять дополнительные бизнес-правила.
Как правило, очередь становится частью отдельного слоя или внутренней библиотеки проекта, а ее структура и поведение подстраиваются под нужды конкретной системы. Поэтому при внедрении очередей важно учитывать специфику задачи, а также возможности и ограничения каждого подхода.