使用 Python 实现一个简单的任务队列(基于 Redis)

2026年6月21日 27 分钟阅读 22 次阅读
📖 文章摘要

本文将基于 Python + Redis从零手写实现一个轻量级、可落地的简易任务队列,完整实现生产者-消费者模型,同时内置任务超时处理、失败重试、异常任务恢复核心机制,帮助大家彻底理解任务队列的底层运行原理。

编程文章:使用 Python 实现一个简单的任务队列(基于 Redis)

在现代软件开发、分布式架构、后端服务开发中,任务队列是解决业务解耦、异步处理、流量削峰、任务重试的核心基础组件。日常开发中的邮件发送、日志统计、数据同步、定时任务、异步接口处理等场景,都离不开任务队列的支撑。

市面上成熟的队列组件如 RabbitMQ、Kafka、Celery 功能强大,但对于轻量级业务、小型服务、个人项目、测试场景而言,部署复杂、学习成本高、资源开销大。本文将基于 Python + Redis从零手写实现一个轻量级、可落地的简易任务队列,完整实现生产者-消费者模型,同时内置任务超时处理、失败重试、异常任务恢复核心机制,帮助大家彻底理解任务队列的底层运行原理。

1\. 为什么选择 Redis?

Redis 作为高性能内存数据库,凭借其超高的读写性能、丰富的数据结构、阻塞命令支持,成为轻量级任务队列的最优选型之一,核心优势如下:

  • 超高读写速度:基于内存读写,无磁盘 IO 开销,毫秒级响应,能够支撑高并发的任务入队、出队场景,完全满足中小型业务的并发需求。

  • 队列天然数据结构:原生支持 List(列表)、Sorted Set(有序集合)、Hash 等数据结构,无需额外封装即可实现普通队列、延迟队列、有序任务、任务信息存储等能力。

  • 阻塞消费机制:提供 BLPOP/BRPOP 阻塞式弹出命令,消费者无任务时自动阻塞等待,避免轮询空跑消耗 CPU 资源,完美适配消费者常驻监听场景。

  • 轻量易部署:Redis 部署简单、配置极简、资源占用低,无需复杂集群即可单机实现队列核心能力,适配开发、测试、轻量生产环境。

  • 支持数据持久化:自带 RDB/AOF 持久化机制,可防止服务重启后任务丢失,保障任务队列的基础可靠性。

2\. 整体设计与核心原理

2\.1 三大核心组件

本次实现的任务队列遵循经典的生产者-消费者架构,拆分三大解耦组件,职责清晰、互不干扰:

  • 生产者(Producer):业务调用入口,负责接收业务任务、封装任务信息,将任务推送至待处理队列,无需关心任务执行逻辑,实现业务与任务执行解耦。

  • 消费者(Consumer):常驻运行的任务执行节点,持续从队列拉取任务、执行业务逻辑、执行完成后确认任务结束,支持多消费者并行消费,实现负载均衡。

  • 队列管理器(Queue):核心底层模块,封装所有 Redis 队列操作,统一提供 push(入队)、pop(出队)、ack(任务确认)、nack(任务失败)、recover(异常恢复)等核心方法,统一管理任务状态。

2\.2 任务状态完整流转机制

为解决任务丢失、重复执行、卡死超时等问题,我们设计了完整的任务状态闭环流转,覆盖正常执行、异常失败、超时卡死所有场景:

正常流程:Pending(待处理)→ 消费者拉取 → Processing(处理中)→ 执行成功 → ACK 确认 → Done(任务结束)

异常流程:Pending(待处理)→ 拉取执行 → 任务报错/执行超时 → 进入 Retry(重试队列)→ 延迟结束后重新回到 Pending 等待再次消费

2\.3 Redis 键设计规范

为区分不同状态的任务,避免数据混乱,统一设计三类 Redis 存储键:

  • queue:{name}:pending:List 结构,存储待处理任务,所有新任务统一入队此处。

  • queue:{name}:processing:Hash 结构,存储处理中任务,记录任务开始执行时间,用于超时检测。

  • queue:{name}:retry:Sorted Set 结构,存储待重试任务,以时间戳为分数,实现延迟重试能力。

3\. 环境准备与依赖安装

3\.1 基础环境要求

  • Python 3.6+ 版本

  • Redis 服务(本地/远程均可,默认本地端口 6379)

3\.2 安装依赖库

核心依赖为官方 Redis 客户端库,执行以下命令安装:

pip install redis

提前启动 Redis 服务,确保服务正常运行在 localhost:6379,无密码、无端口拦截,保证代码可正常连接。

4\. 完整代码实现与逐段解析

4\.1 核心队列类(TaskQueue)

该类封装所有队列核心逻辑,包含任务入队、出队、成功确认、失败重试、超时恢复、任务统计全功能,是整个任务队列的核心底层。

import json
import time
import uuid
import redis

class TaskQueue:
    def __init__(self, name, redis_client=None, retry_delay=10, timeout=30):
        # 队列名称,用于区分不同业务队列
        self.name = name
        # 定义三类任务存储Key
        self.pending_key = f"queue:{name}:pending"
        self.processing_key = f"queue:{name}:processing"
        self.retry_key = f"queue:{name}:retry"
        # 任务重试延迟时间(单位:秒)
        self.retry_delay = retry_delay
        # 任务执行超时时间(单位:秒)
        self.timeout = timeout
        # 支持自定义Redis客户端,适配多环境配置
        self.redis = redis_client or redis.Redis(host='localhost', port=6379, db=0)

    def push(self, payload):
        """
        生产者:向队列推送任务
        :param payload: 任务自定义数据(字典格式)
        :return: 唯一任务ID
        """
        # 生成全局唯一任务ID
        task_id = str(uuid.uuid4())
        # 封装完整任务数据
        task_data = {
            "id": task_id,
            "payload": payload,
            "created_at": time.time()
        }
        # 任务写入pending待处理队列
        self.redis.rpush(self.pending_key, json.dumps(task_data))
        return task_id

    def pop(self, block=True, timeout=0):
        """
        消费者:拉取一个任务
        :param block: 是否阻塞等待
        :param timeout: 阻塞超时时间
        :return: 任务字典 / 无任务返回None
        """
        raw = None
        # 阻塞式拉取,无任务则等待
        if block:
            item = self.redis.blpop(self.pending_key, timeout=timeout)
            if item is None:
                return None
            raw = item[1]
        # 非阻塞式拉取,无任务直接返回
        else:
            raw = self.redis.lpop(self.pending_key)
            if raw is None:
                return None

        # 解析任务数据
        task = json.loads(raw)
        # 记录任务开始执行时间,移入处理中队列
        task["started_at"] = time.time()
        self.redis.hset(self.processing_key, task["id"], json.dumps(task))
        return task

    def ack(self, task_id):
        """
        任务成功确认
        从处理中队列移除任务,任务正式结束
        """
        return self.redis.hdel(self.processing_key, task_id) > 0

    def nack(self, task_id, requeue=True):
        """
        任务失败处理
        :param task_id: 任务ID
        :param requeue: 是否重新入队重试
        :return: 处理结果
        """
        # 获取处理中的任务数据
        raw = self.redis.hget(self.processing_key, task_id)
        if not raw:
            return False
        task = json.loads(raw)
        # 从处理中队列删除
        self.redis.hdel(self.processing_key, task_id)

        # 开启重试则加入延迟重试队列
        if requeue:
            task["retry_count"] = task.get("retry_count", 0) + 1
            # 按当前时间+延迟时间设置重试执行时间
            self.redis.zadd(self.retry_key, {json.dumps(task): time.time() + self.retry_delay})
        return True

    def recover(self):
        """
        任务恢复核心方法:
        1. 恢复超时未完成的处理中任务
        2. 将到期的重试任务重新放回待处理队列
        :return: 恢复的任务数量
        """
        now = time.time()

        # 1. 处理超时任务
        for task_id, raw in self.redis.hscan_iter(self.processing_key):
            task = json.loads(raw)
            # 判断任务是否超时
            if now - task.get("started_at", 0) > self.timeout:
                self.redis.hdel(self.processing_key, task_id)
                task["retry_count"] = task.get("retry_count", 0) + 1
                self.redis.zadd(self.retry_key, {json.dumps(task): now + self.retry_delay})

        # 2. 释放到期重试任务
        expired = self.redis.zrangebyscore(self.retry_key, 0, now)
        for raw in expired:
            self.redis.rpush(self.pending_key, raw)
            self.redis.zrem(self.retry_key, raw)
        return len(expired)

    def size(self):
        """统计当前待执行、待重试的任务总数量"""
        return self.redis.llen(self.pending_key) + self.redis.zcard(self.retry_key)

4\.2 生产者示例代码

生产者负责批量生成任务、推送至队列,模拟业务场景中产生的异步任务,可根据业务需求自定义任务载荷数据。

def producer_demo():
    # 初始化名为demo的任务队列
    q = TaskQueue("demo")
    # 批量生成10条测试任务
    for i in range(10):
        # 自定义任务业务数据
        task_id = q.push({"index": i, "data": f"hello-{i}"})
        print(f"Produced task {task_id} with payload index={i}")

if __name__ == "__main__":
    producer_demo()

4\.3 消费者示例代码(含失败重试)

消费者常驻运行,持续监听队列任务,模拟业务执行逻辑,并随机模拟任务失败,验证重试机制有效性,同时捕获异常保证单个任务失败不影响整体服务运行。

import time

def consumer_demo():
    # 初始化队列,设置任务超时时间为5秒
    q = TaskQueue("demo", timeout=5)
    print("Consumer started, waiting for tasks...")
    # 常驻循环消费任务
    while True:
        # 阻塞拉取任务,10秒无任务则退出
        task = q.pop(block=True, timeout=10)
        if task is None:
            print("No task received, exiting.")
            break

        task_id = task["id"]
        print(f"Processing task {task_id}, payload: {task['payload']}")

        try:
            # 模拟业务任务执行耗时
            time.sleep(1)
            # 模拟业务异常:序号为3的倍数的任务强制失败
            if task["payload"]["index"] % 3 == 0:
                raise ValueError("Simulated failure for index multiple of 3")
            
            # 任务执行成功,确认完成
            q.ack(task_id)
            print(f"Task {task_id} completed.")
        except Exception as e:
            # 任务执行失败,触发重试机制
            print(f"Task {task_id} failed: {e}")
            q.nack(task_id, requeue=True)
            print(f"Task {task_id} requeued for retry.")

if __name__ == "__main__":
    consumer_demo()

5\. 守护恢复线程(解决任务丢失问题)

在实际运行中,如果消费者进程意外崩溃、强制终止,处理中状态的任务会永久滞留,无法自动结束也无法重试。为解决该问题,我们启动一个后台守护线程,定时扫描超时任务、释放重试任务,保障队列任务不丢失。

import threading
import time

def recovery_worker(queue, interval=10):
    """
    任务恢复守护线程
    每10秒执行一次任务恢复检测
    """
    while True:
        recovered = queue.recover()
        if recovered:
            print(f"Recovered {recovered} tasks from retry/timeout.")
        time.sleep(interval)

# 启动后台守护线程(随主线程退出而退出)
if __name__ == "__main__":
    q = TaskQueue("demo")
    thread = threading.Thread(target=recovery_worker, args=(q,), daemon=True)
    thread.start()
    # 保持主线程常驻
    while True:
        time.sleep(1)

6\. 完整整合代码(可直接运行)

将所有功能整合为 task_queue.py 单文件,支持命令行参数区分生产者、消费者模式,开箱即用:

# task_queue.py
import json
import time
import uuid
import redis
import threading
import sys

class TaskQueue:
    def __init__(self, name, redis_client=None, retry_delay=10, timeout=30):
        self.name = name
        self.pending_key = f"queue:{name}:pending"
        self.processing_key = f"queue:{name}:processing"
        self.retry_key = f"queue:{name}:retry"
        self.retry_delay = retry_delay
        self.timeout = timeout
        self.redis = redis_client or redis.Redis(host='localhost', port=6379, db=0)

    def push(self, payload):
        task_id = str(uuid.uuid4())
        task_data = {
            "id": task_id,
            "payload": payload,
            "created_at": time.time()
        }
        self.redis.rpush(self.pending_key, json.dumps(task_data))
        return task_id

    def pop(self, block=True, timeout=0):
        raw = None
        if block:
            item = self.redis.blpop(self.pending_key, timeout=timeout)
            if item is None:
                return None
            raw = item[1]
        else:
            raw = self.redis.lpop(self.pending_key)
            if raw is None:
                return None

        task = json.loads(raw)
        task["started_at"] = time.time()
        self.redis.hset(self.processing_key, task["id"], json.dumps(task))
        return task

    def ack(self, task_id):
        return self.redis.hdel(self.processing_key, task_id) > 0

    def nack(self, task_id, requeue=True):
        raw = self.redis.hget(self.processing_key, task_id)
        if not raw:
            return False
        task = json.loads(raw)
        self.redis.hdel(self.processing_key, task_id)
        if requeue:
            task["retry_count"] = task.get("retry_count", 0) + 1
            self.redis.zadd(self.retry_key, {json.dumps(task): time.time() + self.retry_delay})
        return True

    def recover(self):
        now = time.time()
        # 恢复超时任务
        for task_id, raw in self.redis.hscan_iter(self.processing_key):
            task = json.loads(raw)
            if now - task.get("started_at", 0) > self.timeout:
                self.redis.hdel(self.processing_key, task_id)
                task["retry_count"] = task.get("retry_count", 0) + 1
                self.redis.zadd(self.retry_key, {json.dumps(task): now + self.retry_delay})
        # 释放重试任务
        expired = self.redis.zrangebyscore(self.retry_key, 0, now)
        for raw in expired:
            self.redis.rpush(self.pending_key, raw)
            self.redis.zrem(self.retry_key, raw)
        return len(expired)

    def size(self):
        return self.redis.llen(self.pending_key) + self.redis.zcard(self.retry_key)

# 生产者demo
def producer_demo():
    q = TaskQueue("demo")
    for i in range(10):
        task_id = q.push({"index": i, "data": f"hello-{i}"})
        print(f"Produced task {task_id} with payload index={i}")

# 消费者demo
def consumer_demo():
    q = TaskQueue("demo", timeout=5)
    print("Consumer started, waiting for tasks...")
    while True:
        task = q.pop(block=True, timeout=10)
        if task is None:
            print("No task received, exiting.")
            break
        task_id = task["id"]
        print(f"Processing task {task_id}, payload: {task['payload']}")
        try:
            time.sleep(1)
            if task["payload"]["index"] % 3 == 0:
                raise ValueError("Simulated failure for index multiple of 3")
            q.ack(task_id)
            print(f"Task {task_id} completed.")
        except Exception as e:
            print(f"Task {task_id} failed: {e}")
            q.nack(task_id, requeue=True)
            print(f"Task {task_id} requeued for retry.")

# 恢复守护线程
def recovery_worker(queue, interval=10):
    while True:
        recovered = queue.recover()
        if recovered:
            print(f"Recovered {recovered} tasks from retry/timeout.")
        time.sleep(interval)

if __name__ == "__main__":
    # 启动恢复线程
    q = TaskQueue("demo")
    thread = threading.Thread(target=recovery_worker, args=(q,), daemon=True)
    thread.start()

    # 命令行参数执行
    if len(sys.argv) > 1 and sys.argv[1] == "producer":
        producer_demo()
    elif len(sys.argv) > 1 and sys.argv[1] == "consumer":
        consumer_demo()
    else:
        print("Usage: python task_queue.py [producer|consumer]")

7\. 测试运行步骤

按照以下步骤分步运行,可完整观测任务生产、消费、失败重试、超时恢复全流程:

7\.1 启动 Redis 服务

redis-server

7\.2 启动消费者进程(支持多开,实现负载均衡)

python task_queue.py consumer

7\.3 启动生产者,批量生成任务

python task_queue.py producer

运行效果:正常任务执行成功并标记完成;序号为3倍数的任务模拟失败,自动进入延迟重试队列;超时未完成的任务会被守护线程自动恢复重试。

8\. 功能扩展与生产优化思路

本文实现的是基础版任务队列,可根据生产需求进一步迭代优化,核心扩展方向如下:

  • 消息持久化增强:当前基于 List 实现,任务消费后无日志留存,可改用 Redis Streams 实现消息持久化、消息回溯、消费确认机制,提升可靠性。

  • 优先级队列:通过多队列隔离或 Sorted Set 排序,实现高优先级任务优先消费,适配紧急任务场景。

  • 重试次数限制:增加最大重试次数,避免任务无限重试导致死循环,超出次数后转入死信队列人工处理。

  • 多消费者负载均衡:原生 BLPOP 支持多消费者竞争消费,多开消费者进程即可实现分布式负载均衡,提升任务处理速度。

  • 监控与日志:增加任务执行日志、任务数量统计、失败率监控,对接告警系统,便于线上运维排查问题。

  • 定时任务扩展:基于 Sorted Set 实现定时任务队列,支持指定时间执行任务。

9\. 文章总结

本文基于 Python 和 Redis 从零实现了一套轻量级、高可用、可扩展的简易任务队列,完整落地了生产者-消费者核心模型,同时解决了行业通用的任务超时、异常崩溃、任务重试、任务丢失等痛点。

相较于成熟的队列框架,该自定义队列代码精简、无第三方依赖、逻辑透明、灵活可定制,非常适合小型项目、个人开发、学习研究、轻量异步任务场景使用。通过本次实战,不仅可以掌握 Redis 高级数据结构的实战用法,还能深刻理解任务队列的解耦思想、异步执行原理、异常容错机制,为后续学习 Celery、RocketMQ、Kafka 等专业消息队列打下坚实的基础。

最后更新:2026年6月29日CC BY-NC-SA 4.0

评论

暂无评论,来写第一条吧

© 2026 My Blog. Built with Nuxt.js + FastAPI.