2 分钟
Celery笔记
http://docs.jinkan.org/docs/celery/ 实验项目代码: https://github.com/rectcircle/celery-learn
安装
pip install celery
简介、概念与原理
简介
Celery 是 Python 实现的并行分布式框架,对消息队列进行了一步封装。实现了异步任务的任务提交、序列化、消费者调度和结果存储等。
相关概念
- broker:消息中间件,用于存放任务信息的存储单元,可以选择RabbitMQ,Redis等
- backend:结果存储,可选特性,用于存储任务执行结果用
- worker:任务执行单元,监听broker中需要执行任务,并运行调度任务,并将结果存放到backend
- client:提交任务, 查询任务结果(开启backend可用)的用户进程
原理
client ----提交任务---> broker <-------监视状态、获取任务信息-------> worker
| |
| |
| | 执行任务
| |
| |
-------查询结果------> backend <-----------将结果存储到backend----------
第一个例子
任务(worker) get_started/tasks.py
# -*- coding: utf-8 -*-
from celery import Celery
from time import sleep
app = Celery('tasks',
broker='redis://localhost:6379/1',)
# 在此定义了一个任务
@app.task
def add(x, y):
print "get_started 消费者开始执行"
sleep(5)
result = x + y
print "get_started 消费者执行结束"
return result
# 运行该任务: celery -A get_started.tasks worker --loglevel=info
# 并发度默认为cpu核心数
执行任务
celery -A get_started.tasks worker --loglevel=info
生产者(client)get_started/producer.py
# -*- coding: utf-8 -*-
from get_started.tasks import add
def product():
print "get_started 生产者 调用开始"
result = add.delay(4, 4)
print "get_started 生产这 调用结束"
return result
测试运行run.py
# -*- coding: utf-8 -*-
from get_started.producer import product
from celery.result import AsyncResult
from time import sleep
if __name__ == '__main__':
# 不配置结果backend,将无法拿到结果
print "====测试不使用backend===="
task_result1 = product()
task_result2 = product()
print type(task_result1)
print task_result1.task_id, task_result2.task_id
print task_result1.backend, task_result2.backend
# while not task_result1.ready():
# sleep(.5)
# print task_result1.result
# while not task_result2.ready():
# sleep(.5)
# print task_result2.result
运行
python run.py