-
-
Notifications
You must be signed in to change notification settings - Fork 232
Support shutting down celery workers in various broker migration states #37224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0f3b2d2
cbea049
88b5495
4ea867d
0a5abda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,24 +1,73 @@ | ||
| from celery import Celery | ||
| from django.conf import settings | ||
| from django.core.management.base import BaseCommand | ||
|
|
||
| from celery import Celery | ||
| from kombu import Connection | ||
|
|
||
| from corehq.apps.hqadmin.utils import parse_celery_pings | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Gracefully shutsdown a celery worker" | ||
| help = "Gracefully shuts down a celery worker" | ||
|
|
||
| def add_arguments(self, parser): | ||
| parser.add_argument('hostname') | ||
|
|
||
| def handle(self, hostname, **options): | ||
| celery = Celery() | ||
| celery.config_from_object(settings) | ||
| celery.control.broadcast('shutdown', destination=[hostname]) | ||
| worker_responses = celery.control.ping(timeout=10, destination=[hostname]) | ||
| pings = parse_celery_pings(worker_responses) | ||
| if hostname in pings: | ||
| print('Did not shutdown worker') | ||
| self.celery = Celery() | ||
| self.celery.config_from_object(settings) | ||
|
|
||
| current_broker_url = getattr(settings, 'CELERY_BROKER_URL', None) | ||
| assert current_broker_url is not None, "CELERY_BROKER_URL is not set" | ||
|
|
||
| # as long as OLD_BROKER_URL is set, we are going to send shutdown | ||
| # broadcasts to both the old and new broker urls | ||
| old_broker_url = getattr(settings, 'OLD_BROKER_URL', None) | ||
| migration_in_progress = old_broker_url is not None | ||
|
|
||
| if not migration_in_progress: | ||
| succeeded = self._shutdown(hostname) | ||
| if succeeded: | ||
| print(f'Successfully initiated warm shutdown of {hostname}') | ||
| return | ||
| exit(1) | ||
| print('Successfully initiated warm shutdown') | ||
|
|
||
| for broker_url in [current_broker_url, old_broker_url]: | ||
| broker_conn = Connection(broker_url) | ||
| succeeded = self._shutdown(hostname, broker_conn) | ||
| broker_conn.release() | ||
| if succeeded: | ||
| print( | ||
| '[Broker Migration In Progress] Initiated warm shutdown ' | ||
| f'of {hostname}' | ||
| ) | ||
|
Comment on lines
+34
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: During broker migration, the command sends a redundant shutdown signal on success and incorrectly exits with a success code (0) if both shutdown attempts fail. 🔍 Detailed AnalysisIn the broker migration scenario, the command iterates through two broker URLs to send a shutdown signal. If the first attempt is successful, the code does not exit the loop and proceeds to send a second, redundant shutdown signal to the other broker. Furthermore, if both shutdown attempts fail, the loop completes and the command exits with a success code (0). This is inconsistent with the non-migration path which exits with an error code (1) on failure. This will cause wrapper processes, which rely on the exit code, to incorrectly believe a failed shutdown was successful. 💡 Suggested FixAfter a successful shutdown attempt within the 🤖 Prompt for AI AgentDid we get this right? 👍 / 👎 to inform future reviews. |
||
|
|
||
| def _shutdown(self, hostname, broker_conn=None): | ||
| # if using a custom broker connection, it is unlikely a ping will | ||
| # work properly since the worker might be writing to a different | ||
| # broker than the one it is reading from | ||
| check_worker_up = broker_conn is None | ||
|
|
||
| if check_worker_up and not self._is_worker_up(hostname): | ||
| print(f'{hostname} did not respond to ping. Aborted shutdown.') | ||
| return False | ||
|
|
||
| kwargs = {'destination': [hostname]} | ||
| if broker_conn is not None: | ||
| # use a custom broker connection | ||
| kwargs['connection'] = broker_conn | ||
| self.celery.control.broadcast('shutdown', **kwargs) | ||
|
|
||
| if check_worker_up and self._is_worker_up(hostname): | ||
| # if worker is still up, the shutdown likely did not succeed | ||
| # or it is just a slow shutdown | ||
| print(f'{hostname} responded to ping after initiating shutdown.') | ||
| return False | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it true that this branch results in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. I was attempting to preserve behavior since this is how it functioned before as well, though I'm not sure I agree with it. In practice, I don't think the exit code here actually makes any difference since this is called in the context of a trap in the celery bash runner, and we don't do anything special to handle a failure here. Perhaps it would be more clear to remove the exit(1) from this command entirely. |
||
|
|
||
| return True | ||
|
|
||
| def _is_worker_up(self, hostname): | ||
gherceg marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| worker_responses = self.celery.control.ping( | ||
| timeout=10, destination=[hostname] | ||
| ) | ||
| pings = parse_celery_pings(worker_responses) | ||
| return hostname in pings | ||
This comment was marked as outdated.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.