celery 应用指南¶
[toc]
Install and Start¶
pip install celery
- start worker
celery -A app.celery_tasks worker -l info -c 10 --autoscale=10,3 -Q celeryPriority -n worker.priority@%h
- start beater with mongo
celery -A app.celery_tasks beat -S app.libs.beatmongo.schedulers.MongoScheduler --loglevel=INFO
- start flower
celery -A app.celery_tasks flower --port=5566
or:
flower --basic_auth=flower:flower --port=8082 --broker=amqp://rabbitmq:rabbitmq@rabbitmq:5672// --inspect=True --persistent=True --db=~/flower.db
Check workers¶
- ping workers
celery -A app.celery_tasks inspect ping
- check status
celery -A app.celery_tasks status
- celery purge messages
celery -A app.celery_tasks purge
- celery check registered
celery -A app.celery_tasks inspect registered
Problems¶
- 时区问题
使用非UTC时区时,要注意,当使用了pymongo, 会在存储含有时区 的datetime对象时,转换为utc存储到pymongo,而celery会在读取数据后, 将其按照timezone配置,转换为datetime进行比较使用。 但是,例外情况发生在cronjob中的 年月日时分秒设置,这个需要使用本地时区, 而不能用utc时间。
- mongo-authticated error
pymongo auth error, because the celery use the db.authenticate.
Solution: upgrade celery>=5.2.3, because from 5.2.x, the db.authenticate been removed.
- use rabbitmq
# set
worker.apply_async((para1, para2), task_id=task_id,
routing_key='celery.priority',
queue='celeryPriority',
priority=priority)
# get
self.retry(countdown=30, exc=exc, max_retries=2, priority=self.request.delivery_info.get('priority'))
- BaseTask
class BaseTask(Task):
_db = None
def after_return(self, status, retval, task_id, args, kwargs, einfo):
_args = dict(status=status, retval=retval, task_id=task_id)
logger.info(f"Entering into after_return callback task_id: {task_id}")
if einfo:
logger.exception(einfo)
def on_success(self, retval, task_id, args, kwargs):
logger.info(f"Entering into callback on_success: {task_id}")
update_task_success(task_id)
send_celery_task_id(task_id, msg={"status": "success", "code": 2}) # send to rmq
super().on_success(retval, task_id, args, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.info(f"Entering into callback on_failure..., and retry. task_id: {task_id}")
logger.exception(f"celery error: {exc}")
update_task_failure(task_id, str(exc))
self.retry(countdown=30, exc=exc, max_retries=2, priority=self.request.delivery_info.get('priority'))
@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
logger.setlogrootname("taskeg-celery")
show_envs()
@task_received.connect
def on_task_received(request, **kwargs):
logger.info(f"task: {request.args[0]} received by worker v2 and begin run...")
- Task Queue
task_queues = (
Queue('celeryPriority', Exchange('celeryPriority'), routing_key='celery.priority',
queue_arguments={'x-max-priority': 10}),
Queue('celeryScheduler', Exchange('celeryScheduler'), routing_key='celery.scheduler',
queue_arguments={'x-max-priority': 10})
)
- celery config
celery_config = {
'worker_prefetch_multiplier': 1, # celery worker 每次去取任务的数量。 concurrency * prefetch
'result_expires': 0, # celery任务执行结果的超时时间, 0 never, unit: seconds.
'timezone': 'Asia/Shanghai',
'result_serializer': 'json',
'result_persistent': True,
'task_serializer': 'json',
'worker_concurrency': 5, # celery worker的并发数 也是命令行-c指定的数目
'worker_max_tasks_per_child': 500, # 每个worker执行了多少任务就会死掉
# beat
'beat_schedule': {},
"mongodb_scheduler_db": MONGO_CELERY_DB,
"mongodb_scheduler_collection": MONGO_CELERY_COLLECTION,
"mongodb_scheduler_url": f"mongodb://{ZYSL_MONGO_URI}/{MONGO_CELERY_DB}?authSource=admin",
# task
"task_queues": task_queues,
"task_default_priority": 5,
"task_default_queue": 'celeryPriority',
"task_default_exchange": 'celeryPriority',
"task_default_routing_key": 'celery.priority',
# backend-mongo
"mongodb_backend_settings": {
'database': 'celery',
'taskmeta_collection': 'task_results_01',
},
# common
"enable_utc": False,
"task_acks_late": True,
"task_soft_time_limit": 1800,
"task_time_limit": 3600,
"task_create_missing_queues": True,
"broker_transport_options": {"visibility_timeout": 36000},
"worker_disable_rate_limits": True
}
client = Celery('CeleryTasks',
broker_url=BROKER_URL,
result_backend=BACKEND_URL,
accept_content=['json'],
result_accept_content=['json'],
worker_hijack_root_logger=False,
include=[
'app.celery_tasks.worker_engine',
'app.celery_tasks.worker_help',
'app.celery_tasks.worker_task'
]
)
client.conf.update(**celery_config)