Skip to content

celery 应用指南

[toc]

Install and Start

pip install celery

  • start worker

celery -A app.celery_tasks worker -l info -c 10 --autoscale=10,3 -Q celeryPriority -n worker.priority@%h

  • start beater with mongo

celery -A app.celery_tasks beat -S app.libs.beatmongo.schedulers.MongoScheduler --loglevel=INFO

  • start flower

celery -A app.celery_tasks flower --port=5566

or:

flower --basic_auth=flower:flower --port=8082 --broker=amqp://rabbitmq:rabbitmq@rabbitmq:5672// --inspect=True --persistent=True --db=~/flower.db

Check workers

  • ping workers

celery -A app.celery_tasks inspect ping

  • check status

celery -A app.celery_tasks status

  • celery purge messages

celery -A app.celery_tasks purge

  • celery check registered

celery -A app.celery_tasks inspect registered

Problems

  1. 时区问题

使用非UTC时区时,要注意,当使用了pymongo, 会在存储含有时区 的datetime对象时,转换为utc存储到pymongo,而celery会在读取数据后, 将其按照timezone配置,转换为datetime进行比较使用。 但是,例外情况发生在cronjob中的 年月日时分秒设置,这个需要使用本地时区, 而不能用utc时间。

  1. mongo-authticated error

pymongo auth error, because the celery use the db.authenticate.

Solution: upgrade celery>=5.2.3, because from 5.2.x, the db.authenticate been removed.

  1. use rabbitmq
# set
worker.apply_async((para1, para2), task_id=task_id,
                                        routing_key='celery.priority',
                                        queue='celeryPriority',
                                        priority=priority)
# get
self.retry(countdown=30, exc=exc, max_retries=2, priority=self.request.delivery_info.get('priority'))
  1. BaseTask
class BaseTask(Task):
    _db = None

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        _args = dict(status=status, retval=retval, task_id=task_id)
        logger.info(f"Entering into after_return callback  task_id: {task_id}")
        if einfo:
            logger.exception(einfo)

    def on_success(self, retval, task_id, args, kwargs):
        logger.info(f"Entering into callback on_success: {task_id}")
        update_task_success(task_id)
        send_celery_task_id(task_id, msg={"status": "success", "code": 2})  # send to rmq
        super().on_success(retval, task_id, args, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.info(f"Entering into callback on_failure..., and retry. task_id: {task_id}")
        logger.exception(f"celery error: {exc}")
        update_task_failure(task_id, str(exc))
        self.retry(countdown=30, exc=exc, max_retries=2, priority=self.request.delivery_info.get('priority'))


@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
    logger.setlogrootname("taskeg-celery")
    show_envs()


@task_received.connect
def on_task_received(request, **kwargs):
    logger.info(f"task: {request.args[0]} received by worker v2 and begin run...")
  1. Task Queue
task_queues = (
    Queue('celeryPriority', Exchange('celeryPriority'), routing_key='celery.priority',
          queue_arguments={'x-max-priority': 10}),
    Queue('celeryScheduler', Exchange('celeryScheduler'), routing_key='celery.scheduler',
          queue_arguments={'x-max-priority': 10})
)
  1. celery config
celery_config = {
    'worker_prefetch_multiplier': 1,  # celery worker 每次去取任务的数量。 concurrency * prefetch
    'result_expires': 0,  # celery任务执行结果的超时时间, 0 never, unit: seconds.
    'timezone': 'Asia/Shanghai',
    'result_serializer': 'json',
    'result_persistent': True,
    'task_serializer': 'json',
    'worker_concurrency': 5,  # celery worker的并发数 也是命令行-c指定的数目
    'worker_max_tasks_per_child': 500,  # 每个worker执行了多少任务就会死掉
    # beat
    'beat_schedule': {},
    "mongodb_scheduler_db": MONGO_CELERY_DB,
    "mongodb_scheduler_collection": MONGO_CELERY_COLLECTION,
    "mongodb_scheduler_url": f"mongodb://{ZYSL_MONGO_URI}/{MONGO_CELERY_DB}?authSource=admin",
    # task
    "task_queues": task_queues,
    "task_default_priority": 5,
    "task_default_queue": 'celeryPriority',
    "task_default_exchange": 'celeryPriority',
    "task_default_routing_key": 'celery.priority',
    # backend-mongo
    "mongodb_backend_settings": {
        'database': 'celery',
        'taskmeta_collection': 'task_results_01',
    },
    # common
    "enable_utc": False,
    "task_acks_late": True,
    "task_soft_time_limit": 1800,
    "task_time_limit": 3600,
    "task_create_missing_queues": True,
    "broker_transport_options": {"visibility_timeout": 36000},
    "worker_disable_rate_limits": True
}

client = Celery('CeleryTasks',
                broker_url=BROKER_URL,
                result_backend=BACKEND_URL,
                accept_content=['json'],
                result_accept_content=['json'],
                worker_hijack_root_logger=False,
                include=[
                    'app.celery_tasks.worker_engine',
                    'app.celery_tasks.worker_help',
                    'app.celery_tasks.worker_task'
                ]
                )
client.conf.update(**celery_config)