Estoy tratando de ejecutar una tarea usando apio. Necesito enviar solicitudes de publicación a un servidor remoto mientras el usuario presiona el botón Enviar, así que intenté usar apio con Redis aquí con esta configuración en el archivo de configuración:
BROKER_URL = os.environ.get("REDIS_URL") CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL") CELERY_ACCEPT_CONTENT = ["application/json"] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Dubai'
de acuerdo con la documentación de apply_async , puedo definir opciones de reintento como el siguiente código:
__task_expiration = 60 __interval_start = 1 * 60 api_generator.apply_async(args=(*args), group=user_key, expires=__task_expiration, retry=True, retry_policy={ "max_retries": 3, "interval_start": __interval_start })
En la documentación encontré esta definición para apply_async:
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
y siguiendo la documentación, puedo configurar esto usando retry y retry_policy
y un código de muestra sobre cómo definir las opciones de reintento
add.apply_async((2, 2), retry=True, retry_policy={ 'max_retries': 3, 'interval_start': 0, 'interval_step': 0.2, 'interval_max': 0.2, })
Quiero que mi tarea se ejecute 3 veces para que se ejecute en caso de falla, y el intervalo entre cada reintento sea de 60 segundos. mi definición de tarea se ve así:
@shared_task def api_generator(*args): import requests import json url = os.environ.get("API_URL_CALL") api_access_key = os.environ.get("API_ACCESS_KEY") headers = { "Authorization": api_access_key, "Content-Type": "application/json" } json_schema = generate_json(*args) response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30) if response.status_code != 200: raise NameError("API Response error") return response.status_code
pero cuando mi código falla, no veo ningún mecanismo de reintento en los registros de apio, ¿cuál es el problema aquí? ¿Cómo puedo definir el reintento al llamar a mis tareas usando el método apply_async? Estoy generando NameError("Exception")
para decirle al trabajador que se ha producido un error.
[EDITAR 1: Se agregó acks_late
]
Hay dos cosas que pueden salir mal cuando envía una tarea a un trabajador de Celery:
El primer problema se puede resolver definiendo retry
y retry_policy
como lo hizo.
El segundo tipo (que es lo que desea resolver), se puede resolver llamando a self.retry()
cuando falle una tarea.
Dependiendo de su tipo de problema, podría ser útil establecer CELERY_ACKS_LATE = True
.
Consulte estos enlaces para obtener más información:
Reintentar tareas perdidas o fallidas (Apio, Django y RabbitMQ)
https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/