本文将基于 Python + Redis从零手写实现一个轻量级、可落地的简易任务队列,完整实现生产者-消费者模型,同时内置任务超时处理、失败重试、异常任务恢复核心机制,帮助大家彻底理解任务队列的底层运行原理。
编程文章:使用 Python 实现一个简单的任务队列(基于 Redis)
在现代软件开发、分布式架构、后端服务开发中,任务队列是解决业务解耦、异步处理、流量削峰、任务重试的核心基础组件。日常开发中的邮件发送、日志统计、数据同步、定时任务、异步接口处理等场景,都离不开任务队列的支撑。
市面上成熟的队列组件如 RabbitMQ、Kafka、Celery 功能强大,但对于轻量级业务、小型服务、个人项目、测试场景而言,部署复杂、学习成本高、资源开销大。本文将基于 Python + Redis从零手写实现一个轻量级、可落地的简易任务队列,完整实现生产者-消费者模型,同时内置任务超时处理、失败重试、异常任务恢复核心机制,帮助大家彻底理解任务队列的底层运行原理。
1\. 为什么选择 Redis?
Redis 作为高性能内存数据库,凭借其超高的读写性能、丰富的数据结构、阻塞命令支持,成为轻量级任务队列的最优选型之一,核心优势如下:
超高读写速度:基于内存读写,无磁盘 IO 开销,毫秒级响应,能够支撑高并发的任务入队、出队场景,完全满足中小型业务的并发需求。
队列天然数据结构:原生支持 List(列表)、Sorted Set(有序集合)、Hash 等数据结构,无需额外封装即可实现普通队列、延迟队列、有序任务、任务信息存储等能力。
阻塞消费机制:提供
BLPOP/BRPOP阻塞式弹出命令,消费者无任务时自动阻塞等待,避免轮询空跑消耗 CPU 资源,完美适配消费者常驻监听场景。轻量易部署:Redis 部署简单、配置极简、资源占用低,无需复杂集群即可单机实现队列核心能力,适配开发、测试、轻量生产环境。
支持数据持久化:自带 RDB/AOF 持久化机制,可防止服务重启后任务丢失,保障任务队列的基础可靠性。
2\. 整体设计与核心原理
2\.1 三大核心组件
本次实现的任务队列遵循经典的生产者-消费者架构,拆分三大解耦组件,职责清晰、互不干扰:
生产者(Producer):业务调用入口,负责接收业务任务、封装任务信息,将任务推送至待处理队列,无需关心任务执行逻辑,实现业务与任务执行解耦。
消费者(Consumer):常驻运行的任务执行节点,持续从队列拉取任务、执行业务逻辑、执行完成后确认任务结束,支持多消费者并行消费,实现负载均衡。
队列管理器(Queue):核心底层模块,封装所有 Redis 队列操作,统一提供
push(入队)、pop(出队)、ack(任务确认)、nack(任务失败)、recover(异常恢复)等核心方法,统一管理任务状态。
2\.2 任务状态完整流转机制
为解决任务丢失、重复执行、卡死超时等问题,我们设计了完整的任务状态闭环流转,覆盖正常执行、异常失败、超时卡死所有场景:
正常流程:Pending(待处理)→ 消费者拉取 → Processing(处理中)→ 执行成功 → ACK 确认 → Done(任务结束)
异常流程:Pending(待处理)→ 拉取执行 → 任务报错/执行超时 → 进入 Retry(重试队列)→ 延迟结束后重新回到 Pending 等待再次消费
2\.3 Redis 键设计规范
为区分不同状态的任务,避免数据混乱,统一设计三类 Redis 存储键:
queue:{name}:pending:List 结构,存储待处理任务,所有新任务统一入队此处。queue:{name}:processing:Hash 结构,存储处理中任务,记录任务开始执行时间,用于超时检测。queue:{name}:retry:Sorted Set 结构,存储待重试任务,以时间戳为分数,实现延迟重试能力。
3\. 环境准备与依赖安装
3\.1 基础环境要求
Python 3.6+ 版本
Redis 服务(本地/远程均可,默认本地端口 6379)
3\.2 安装依赖库
核心依赖为官方 Redis 客户端库,执行以下命令安装:
pip install redis
提前启动 Redis 服务,确保服务正常运行在 localhost:6379,无密码、无端口拦截,保证代码可正常连接。
4\. 完整代码实现与逐段解析
4\.1 核心队列类(TaskQueue)
该类封装所有队列核心逻辑,包含任务入队、出队、成功确认、失败重试、超时恢复、任务统计全功能,是整个任务队列的核心底层。
import json
import time
import uuid
import redis
class TaskQueue:
def __init__(self, name, redis_client=None, retry_delay=10, timeout=30):
# 队列名称,用于区分不同业务队列
self.name = name
# 定义三类任务存储Key
self.pending_key = f"queue:{name}:pending"
self.processing_key = f"queue:{name}:processing"
self.retry_key = f"queue:{name}:retry"
# 任务重试延迟时间(单位:秒)
self.retry_delay = retry_delay
# 任务执行超时时间(单位:秒)
self.timeout = timeout
# 支持自定义Redis客户端,适配多环境配置
self.redis = redis_client or redis.Redis(host='localhost', port=6379, db=0)
def push(self, payload):
"""
生产者:向队列推送任务
:param payload: 任务自定义数据(字典格式)
:return: 唯一任务ID
"""
# 生成全局唯一任务ID
task_id = str(uuid.uuid4())
# 封装完整任务数据
task_data = {
"id": task_id,
"payload": payload,
"created_at": time.time()
}
# 任务写入pending待处理队列
self.redis.rpush(self.pending_key, json.dumps(task_data))
return task_id
def pop(self, block=True, timeout=0):
"""
消费者:拉取一个任务
:param block: 是否阻塞等待
:param timeout: 阻塞超时时间
:return: 任务字典 / 无任务返回None
"""
raw = None
# 阻塞式拉取,无任务则等待
if block:
item = self.redis.blpop(self.pending_key, timeout=timeout)
if item is None:
return None
raw = item[1]
# 非阻塞式拉取,无任务直接返回
else:
raw = self.redis.lpop(self.pending_key)
if raw is None:
return None
# 解析任务数据
task = json.loads(raw)
# 记录任务开始执行时间,移入处理中队列
task["started_at"] = time.time()
self.redis.hset(self.processing_key, task["id"], json.dumps(task))
return task
def ack(self, task_id):
"""
任务成功确认
从处理中队列移除任务,任务正式结束
"""
return self.redis.hdel(self.processing_key, task_id) > 0
def nack(self, task_id, requeue=True):
"""
任务失败处理
:param task_id: 任务ID
:param requeue: 是否重新入队重试
:return: 处理结果
"""
# 获取处理中的任务数据
raw = self.redis.hget(self.processing_key, task_id)
if not raw:
return False
task = json.loads(raw)
# 从处理中队列删除
self.redis.hdel(self.processing_key, task_id)
# 开启重试则加入延迟重试队列
if requeue:
task["retry_count"] = task.get("retry_count", 0) + 1
# 按当前时间+延迟时间设置重试执行时间
self.redis.zadd(self.retry_key, {json.dumps(task): time.time() + self.retry_delay})
return True
def recover(self):
"""
任务恢复核心方法:
1. 恢复超时未完成的处理中任务
2. 将到期的重试任务重新放回待处理队列
:return: 恢复的任务数量
"""
now = time.time()
# 1. 处理超时任务
for task_id, raw in self.redis.hscan_iter(self.processing_key):
task = json.loads(raw)
# 判断任务是否超时
if now - task.get("started_at", 0) > self.timeout:
self.redis.hdel(self.processing_key, task_id)
task["retry_count"] = task.get("retry_count", 0) + 1
self.redis.zadd(self.retry_key, {json.dumps(task): now + self.retry_delay})
# 2. 释放到期重试任务
expired = self.redis.zrangebyscore(self.retry_key, 0, now)
for raw in expired:
self.redis.rpush(self.pending_key, raw)
self.redis.zrem(self.retry_key, raw)
return len(expired)
def size(self):
"""统计当前待执行、待重试的任务总数量"""
return self.redis.llen(self.pending_key) + self.redis.zcard(self.retry_key)
4\.2 生产者示例代码
生产者负责批量生成任务、推送至队列,模拟业务场景中产生的异步任务,可根据业务需求自定义任务载荷数据。
def producer_demo():
# 初始化名为demo的任务队列
q = TaskQueue("demo")
# 批量生成10条测试任务
for i in range(10):
# 自定义任务业务数据
task_id = q.push({"index": i, "data": f"hello-{i}"})
print(f"Produced task {task_id} with payload index={i}")
if __name__ == "__main__":
producer_demo()
4\.3 消费者示例代码(含失败重试)
消费者常驻运行,持续监听队列任务,模拟业务执行逻辑,并随机模拟任务失败,验证重试机制有效性,同时捕获异常保证单个任务失败不影响整体服务运行。
import time
def consumer_demo():
# 初始化队列,设置任务超时时间为5秒
q = TaskQueue("demo", timeout=5)
print("Consumer started, waiting for tasks...")
# 常驻循环消费任务
while True:
# 阻塞拉取任务,10秒无任务则退出
task = q.pop(block=True, timeout=10)
if task is None:
print("No task received, exiting.")
break
task_id = task["id"]
print(f"Processing task {task_id}, payload: {task['payload']}")
try:
# 模拟业务任务执行耗时
time.sleep(1)
# 模拟业务异常:序号为3的倍数的任务强制失败
if task["payload"]["index"] % 3 == 0:
raise ValueError("Simulated failure for index multiple of 3")
# 任务执行成功,确认完成
q.ack(task_id)
print(f"Task {task_id} completed.")
except Exception as e:
# 任务执行失败,触发重试机制
print(f"Task {task_id} failed: {e}")
q.nack(task_id, requeue=True)
print(f"Task {task_id} requeued for retry.")
if __name__ == "__main__":
consumer_demo()
5\. 守护恢复线程(解决任务丢失问题)
在实际运行中,如果消费者进程意外崩溃、强制终止,处理中状态的任务会永久滞留,无法自动结束也无法重试。为解决该问题,我们启动一个后台守护线程,定时扫描超时任务、释放重试任务,保障队列任务不丢失。
import threading
import time
def recovery_worker(queue, interval=10):
"""
任务恢复守护线程
每10秒执行一次任务恢复检测
"""
while True:
recovered = queue.recover()
if recovered:
print(f"Recovered {recovered} tasks from retry/timeout.")
time.sleep(interval)
# 启动后台守护线程(随主线程退出而退出)
if __name__ == "__main__":
q = TaskQueue("demo")
thread = threading.Thread(target=recovery_worker, args=(q,), daemon=True)
thread.start()
# 保持主线程常驻
while True:
time.sleep(1)
6\. 完整整合代码(可直接运行)
将所有功能整合为 task_queue.py 单文件,支持命令行参数区分生产者、消费者模式,开箱即用:
# task_queue.py
import json
import time
import uuid
import redis
import threading
import sys
class TaskQueue:
def __init__(self, name, redis_client=None, retry_delay=10, timeout=30):
self.name = name
self.pending_key = f"queue:{name}:pending"
self.processing_key = f"queue:{name}:processing"
self.retry_key = f"queue:{name}:retry"
self.retry_delay = retry_delay
self.timeout = timeout
self.redis = redis_client or redis.Redis(host='localhost', port=6379, db=0)
def push(self, payload):
task_id = str(uuid.uuid4())
task_data = {
"id": task_id,
"payload": payload,
"created_at": time.time()
}
self.redis.rpush(self.pending_key, json.dumps(task_data))
return task_id
def pop(self, block=True, timeout=0):
raw = None
if block:
item = self.redis.blpop(self.pending_key, timeout=timeout)
if item is None:
return None
raw = item[1]
else:
raw = self.redis.lpop(self.pending_key)
if raw is None:
return None
task = json.loads(raw)
task["started_at"] = time.time()
self.redis.hset(self.processing_key, task["id"], json.dumps(task))
return task
def ack(self, task_id):
return self.redis.hdel(self.processing_key, task_id) > 0
def nack(self, task_id, requeue=True):
raw = self.redis.hget(self.processing_key, task_id)
if not raw:
return False
task = json.loads(raw)
self.redis.hdel(self.processing_key, task_id)
if requeue:
task["retry_count"] = task.get("retry_count", 0) + 1
self.redis.zadd(self.retry_key, {json.dumps(task): time.time() + self.retry_delay})
return True
def recover(self):
now = time.time()
# 恢复超时任务
for task_id, raw in self.redis.hscan_iter(self.processing_key):
task = json.loads(raw)
if now - task.get("started_at", 0) > self.timeout:
self.redis.hdel(self.processing_key, task_id)
task["retry_count"] = task.get("retry_count", 0) + 1
self.redis.zadd(self.retry_key, {json.dumps(task): now + self.retry_delay})
# 释放重试任务
expired = self.redis.zrangebyscore(self.retry_key, 0, now)
for raw in expired:
self.redis.rpush(self.pending_key, raw)
self.redis.zrem(self.retry_key, raw)
return len(expired)
def size(self):
return self.redis.llen(self.pending_key) + self.redis.zcard(self.retry_key)
# 生产者demo
def producer_demo():
q = TaskQueue("demo")
for i in range(10):
task_id = q.push({"index": i, "data": f"hello-{i}"})
print(f"Produced task {task_id} with payload index={i}")
# 消费者demo
def consumer_demo():
q = TaskQueue("demo", timeout=5)
print("Consumer started, waiting for tasks...")
while True:
task = q.pop(block=True, timeout=10)
if task is None:
print("No task received, exiting.")
break
task_id = task["id"]
print(f"Processing task {task_id}, payload: {task['payload']}")
try:
time.sleep(1)
if task["payload"]["index"] % 3 == 0:
raise ValueError("Simulated failure for index multiple of 3")
q.ack(task_id)
print(f"Task {task_id} completed.")
except Exception as e:
print(f"Task {task_id} failed: {e}")
q.nack(task_id, requeue=True)
print(f"Task {task_id} requeued for retry.")
# 恢复守护线程
def recovery_worker(queue, interval=10):
while True:
recovered = queue.recover()
if recovered:
print(f"Recovered {recovered} tasks from retry/timeout.")
time.sleep(interval)
if __name__ == "__main__":
# 启动恢复线程
q = TaskQueue("demo")
thread = threading.Thread(target=recovery_worker, args=(q,), daemon=True)
thread.start()
# 命令行参数执行
if len(sys.argv) > 1 and sys.argv[1] == "producer":
producer_demo()
elif len(sys.argv) > 1 and sys.argv[1] == "consumer":
consumer_demo()
else:
print("Usage: python task_queue.py [producer|consumer]")
7\. 测试运行步骤
按照以下步骤分步运行,可完整观测任务生产、消费、失败重试、超时恢复全流程:
7\.1 启动 Redis 服务
redis-server
7\.2 启动消费者进程(支持多开,实现负载均衡)
python task_queue.py consumer
7\.3 启动生产者,批量生成任务
python task_queue.py producer
运行效果:正常任务执行成功并标记完成;序号为3倍数的任务模拟失败,自动进入延迟重试队列;超时未完成的任务会被守护线程自动恢复重试。
8\. 功能扩展与生产优化思路
本文实现的是基础版任务队列,可根据生产需求进一步迭代优化,核心扩展方向如下:
消息持久化增强:当前基于 List 实现,任务消费后无日志留存,可改用 Redis Streams 实现消息持久化、消息回溯、消费确认机制,提升可靠性。
优先级队列:通过多队列隔离或 Sorted Set 排序,实现高优先级任务优先消费,适配紧急任务场景。
重试次数限制:增加最大重试次数,避免任务无限重试导致死循环,超出次数后转入死信队列人工处理。
多消费者负载均衡:原生 BLPOP 支持多消费者竞争消费,多开消费者进程即可实现分布式负载均衡,提升任务处理速度。
监控与日志:增加任务执行日志、任务数量统计、失败率监控,对接告警系统,便于线上运维排查问题。
定时任务扩展:基于 Sorted Set 实现定时任务队列,支持指定时间执行任务。
9\. 文章总结
本文基于 Python 和 Redis 从零实现了一套轻量级、高可用、可扩展的简易任务队列,完整落地了生产者-消费者核心模型,同时解决了行业通用的任务超时、异常崩溃、任务重试、任务丢失等痛点。
相较于成熟的队列框架,该自定义队列代码精简、无第三方依赖、逻辑透明、灵活可定制,非常适合小型项目、个人开发、学习研究、轻量异步任务场景使用。通过本次实战,不仅可以掌握 Redis 高级数据结构的实战用法,还能深刻理解任务队列的解耦思想、异步执行原理、异常容错机制,为后续学习 Celery、RocketMQ、Kafka 等专业消息队列打下坚实的基础。
评论
暂无评论,来写第一条吧
