Skip to content

Commit 758459a

Browse files
seta dinamicamente a fila que será usada no parse_log
1 parent ff9c6a0 commit 758459a

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

metrics/tasks.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
User = get_user_model()
2828

2929

30+
def extract_celery_queue_name(collection_acronym):
31+
return f"parse_{settings.COLLECTION_ACRON3_SIZE_MAP.get(collection_acronym, 'small')}"
32+
33+
3034
@celery_app.task(bind=True, name=_('Parse logs'), timelimit=-1)
3135
def task_parse_logs(self, collections=[], include_logs_with_error=True, batch_size=5000, replace=False, track_errors=False, from_date=None, until_date=None, days_to_go_back=None, user_id=None, username=None):
3236
"""
@@ -69,8 +73,13 @@ def task_parse_logs(self, collections=[], include_logs_with_error=True, batch_si
6973
if probably_date < from_date_obj or probably_date > until_date_obj:
7074
continue
7175

76+
queue_name = extract_celery_queue_name(collection)
77+
7278
logging.info(f'PARSING file {lf.path}')
73-
task_parse_log.apply_async(args=(lf.hash, batch_size, replace, track_errors, user_id, username))
79+
task_parse_log.apply_async(
80+
args=(lf.hash, batch_size, replace, track_errors, user_id, username),
81+
queue=queue_name,
82+
)
7483

7584

7685
@celery_app.task(bind=True, name=_('Parse one log'), timelimit=-1)

0 commit comments

Comments
 (0)