From bf5ade70079e1d0a50ec57e8a4c08dd976a26c00 Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Mon, 23 Apr 2018 20:04:26 -0700 Subject: [PATCH 1/6] Task.task_count_from_queue helper method --- tasktiger/task.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tasktiger/task.py b/tasktiger/task.py index 166eaa09..255a6311 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -8,6 +8,7 @@ __all__ = ['Task'] + class Task(object): def __init__(self, tiger, func=None, args=None, kwargs=None, queue=None, hard_timeout=None, unique=None, lock=None, lock_key=None, @@ -397,6 +398,16 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, return n, tasks + @classmethod + def task_count_from_queue(self, tiger, queue, state): + """ + Returns the number of tasks in a given queue and task state. + """ + + key = tiger._key(state, queue) + count = tiger.connection.zcount(key, '-inf', '+inf') + return count + def n_executions(self): """ Queries and returns the number of past task executions. From 4934f7faf89e9e94c4385758b120ed255b49bd80 Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Mon, 23 Apr 2018 20:32:39 -0700 Subject: [PATCH 2/6] fix pep8 linter issues --- tasktiger/_internal.py | 12 +++++++-- tasktiger/exceptions.py | 5 ++++ tasktiger/flask_script.py | 5 ++-- tasktiger/redis_lock.py | 8 +++--- tasktiger/redis_scripts.py | 10 ++++---- tasktiger/retry.py | 10 ++++++-- tasktiger/rollbar.py | 2 ++ tasktiger/schedule.py | 4 ++- tasktiger/stats.py | 1 + tasktiger/task.py | 24 +++++++++++++----- tasktiger/worker.py | 51 +++++++++++++++++++++++++------------- 11 files changed, 93 insertions(+), 39 deletions(-) diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index e0fc07ef..ae105cb5 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -36,6 +36,7 @@ 'current_tasks': None, } + # from rq def import_attribute(name): """Return an attribute from a dotted path name (e.g. "path.to.func").""" @@ -47,12 +48,14 @@ def import_attribute(name): except (ValueError, ImportError, AttributeError) as e: raise TaskImportError(e) + def gen_id(): """ Generates and returns a random hex-encoded 256-bit unique ID. """ return binascii.b2a_hex(os.urandom(32)).decode('utf8') + def gen_unique_id(serialized_name, args, kwargs): """ Generates and returns a hex-encoded 256-bit ID for the given task name and @@ -64,6 +67,7 @@ def gen_unique_id(serialized_name, args, kwargs): 'kwargs': kwargs, }, sort_keys=True).encode('utf8')).hexdigest() + def serialize_func_name(func): """ Returns the dotted serialized path to the passed function. @@ -78,18 +82,20 @@ def serialize_func_name(func): func_name = func.__name__ return ':'.join([func.__module__, func_name]) + def dotted_parts(s): """ For a string "a.b.c", yields "a", "a.b", "a.b.c". """ idx = -1 while s: - idx = s.find('.', idx+1) + idx = s.find('.', idx + 1) if idx == -1: yield s break yield s[:idx] + def reversed_dotted_parts(s): """ For a string "a.b.c", yields "a.b.c", "a.b", "a". @@ -103,12 +109,14 @@ def reversed_dotted_parts(s): break yield s[:idx] + def serialize_retry_method(retry_method): if callable(retry_method): return (serialize_func_name(retry_method), ()) else: return (serialize_func_name(retry_method[0]), retry_method[1]) + def get_timestamp(when): # convert timedelta to datetime if isinstance(when, datetime.timedelta): @@ -117,4 +125,4 @@ def get_timestamp(when): if when: # Convert to unixtime: utctimetuple drops microseconds so we add # them manually. - return calendar.timegm(when.utctimetuple()) + when.microsecond/1.e6 + return calendar.timegm(when.utctimetuple()) + when.microsecond / 1.e6 diff --git a/tasktiger/exceptions.py b/tasktiger/exceptions.py index b6911709..da0996cf 100644 --- a/tasktiger/exceptions.py +++ b/tasktiger/exceptions.py @@ -1,21 +1,25 @@ import sys + class TaskImportError(ImportError): """ Raised when a task could not be imported. """ + class JobTimeoutException(BaseException): """ Raised when a job takes longer to complete than the allowed maximum timeout value. """ + class StopRetry(Exception): """ Raised by a retry function to indicate that the task shouldn't be retried. """ + class RetryException(BaseException): """ Alternative to retry_on for retrying a task. If raised within a task, the @@ -32,6 +36,7 @@ def __init__(self, method=None, original_traceback=False, log_error=True): self.exc_info = sys.exc_info() if original_traceback else None self.log_error = log_error + class TaskNotFound(Exception): """ The task was not found or does not exist in the given queue/state. diff --git a/tasktiger/flask_script.py b/tasktiger/flask_script.py index e2ac63bf..116f8e6c 100644 --- a/tasktiger/flask_script.py +++ b/tasktiger/flask_script.py @@ -3,6 +3,7 @@ import argparse from flask_script import Command + class TaskTigerCommand(Command): capture_all_args = True help = 'Run a TaskTiger worker' @@ -14,10 +15,10 @@ def __init__(self, tiger): def create_parser(self, *args, **kwargs): # Override the default parser so we can pass all arguments to the # TaskTiger parser. - func_stack = kwargs.pop('func_stack',()) + func_stack = kwargs.pop('func_stack', ()) parent = kwargs.pop('parent', None) parser = argparse.ArgumentParser(*args, add_help=False, **kwargs) - parser.set_defaults(func_stack=func_stack+(self,)) + parser.set_defaults(func_stack=func_stack + (self,)) self.parser = parser self.parent = parent return parser diff --git a/tasktiger/redis_lock.py b/tasktiger/redis_lock.py index 929c52eb..8fdc2b22 100644 --- a/tasktiger/redis_lock.py +++ b/tasktiger/redis_lock.py @@ -1,5 +1,6 @@ -from redis.lock import LockError import time +from redis import WatchError +from redis.lock import Lock as RedisLock, LockError # TODO: Switch to Redlock (http://redis.io/topics/distlock) once the following # bugs are fixed: @@ -7,10 +8,10 @@ # * https://github.com/andymccurdy/redis-py/issues/629 # * https://github.com/andymccurdy/redis-py/issues/601 + # For now, we're using the old-style lock pattern (based on py-redis 2.8.0) # The class below additionally catches ValueError for better compatibility with # new-style locks (for when we upgrade), and adds a renew() method. - class Lock(object): """ A shared, distributed Lock. Using Redis for locking allows the Lock @@ -113,11 +114,10 @@ def renew(self, timeout=None): self.redis.getset(self.name, timeout_at) self.acquired_until = timeout_at + # For now unused: # New-style Lock with renew() method (andymccurdy/redis-py#629) # XXX: when upgrading to the new-style class, take old-style locks into account -from redis import WatchError -from redis.lock import Lock as RedisLock class NewStyleLock(RedisLock): def renew(self, new_timeout): """ diff --git a/tasktiger/redis_scripts.py b/tasktiger/redis_scripts.py index 9f7f763b..ffeb11d6 100644 --- a/tasktiger/redis_scripts.py +++ b/tasktiger/redis_scripts.py @@ -7,10 +7,10 @@ redis.call('zadd', {key}, {score}, {member}) end """ -ZADD_NOUPDATE = ZADD_NOUPDATE_TEMPLATE.format( +ZADD_NOUPDATE = ZADD_NOUPDATE_TEMPLATE.format( key='KEYS[1]', score='ARGV[1]', member='ARGV[2]', condition='not' ) -ZADD_UPDATE_EXISTING = ZADD_NOUPDATE_TEMPLATE.format( +ZADD_UPDATE_EXISTING = ZADD_NOUPDATE_TEMPLATE.format( key='KEYS[1]', score='ARGV[1]', member='ARGV[2]', condition='' ) ZADD_UPDATE_TEMPLATE = """ @@ -372,7 +372,7 @@ def zpoppush(self, source, destination, count, score, new_score, (their score will not be updated). """ if score is None: - score = '+inf' # Include all elements. + score = '+inf' # Include all elements. if withscores: if on_success: raise NotImplementedError() @@ -438,7 +438,7 @@ def delete_if_not_in_zsets(self, key, member, set_list, client=None): ``set_list``. Returns the number of removed elements (0 or 1). """ return self._delete_if_not_in_zsets( - keys=[key]+set_list, + keys=[key] + set_list, args=[member], client=client) @@ -491,7 +491,7 @@ def execute_pipeline(self, pipeline, client=None): stack = pipeline.command_stack script_args = [int(self.can_replicate_commands), len(stack)] for args, options in stack: - script_args += [len(args)-1] + list(args) + script_args += [len(args) - 1] + list(args) # Run the pipeline if self.can_replicate_commands: # Redis 3.2 or higher diff --git a/tasktiger/retry.py b/tasktiger/retry.py index 30fc494c..7af80382 100644 --- a/tasktiger/retry.py +++ b/tasktiger/retry.py @@ -1,26 +1,32 @@ # The retry logic is documented in the README. from .exceptions import StopRetry + def _fixed(retry, delay, max_retries): if retry > max_retries: raise StopRetry() return delay + def fixed(delay, max_retries): return (_fixed, (delay, max_retries)) + def _linear(retry, delay, increment, max_retries): if retry > max_retries: raise StopRetry() - return delay + increment*(retry-1) + return delay + increment * (retry - 1) + def linear(delay, increment, max_retries): return (_linear, (delay, increment, max_retries)) + def _exponential(retry, delay, factor, max_retries): if retry > max_retries: raise StopRetry() - return delay * factor**(retry-1) + return delay * factor ** (retry - 1) + def exponential(delay, factor, max_retries): return (_exponential, (delay, factor, max_retries)) diff --git a/tasktiger/rollbar.py b/tasktiger/rollbar.py index ff893c08..dde7d0ac 100644 --- a/tasktiger/rollbar.py +++ b/tasktiger/rollbar.py @@ -5,6 +5,7 @@ import rollbar from rollbar.logger import RollbarHandler + class StructlogRollbarHandler(RollbarHandler): def __init__(self, prefix, *args, **kwargs): """ @@ -17,6 +18,7 @@ def __init__(self, prefix, *args, **kwargs): def format_title(self, data): # Keys used to construct the title and for grouping purposes. KEYS = ['event', 'func', 'exception_name', 'queue'] + def format_field(field, value): if field == 'queue': return '%s=%s' % (field, value.split('.')[0]) diff --git a/tasktiger/schedule.py b/tasktiger/schedule.py index 4ef44fdf..bcfa0efe 100644 --- a/tasktiger/schedule.py +++ b/tasktiger/schedule.py @@ -2,6 +2,7 @@ __all__ = ['periodic'] + def _periodic(dt, period, start_date, end_date): if end_date and dt >= end_date: return None @@ -11,7 +12,7 @@ def _periodic(dt, period, start_date, end_date): # Determine the next time the task should be run delta = dt - start_date - seconds = delta.seconds + delta.days*86400 + seconds = delta.seconds + delta.days * 86400 runs = seconds // period next_run = runs + 1 next_date = start_date + datetime.timedelta(seconds=next_run * period) @@ -22,6 +23,7 @@ def _periodic(dt, period, start_date, end_date): return next_date + def periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None, end_date=None): """ diff --git a/tasktiger/stats.py b/tasktiger/stats.py index 07e10098..3d1455ae 100644 --- a/tasktiger/stats.py +++ b/tasktiger/stats.py @@ -3,6 +3,7 @@ from ._internal import g_fork_lock + class StatsThread(threading.Thread): def __init__(self, tiger): super(StatsThread, self).__init__() diff --git a/tasktiger/task.py b/tasktiger/task.py index 255a6311..8e8320d3 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -3,7 +3,19 @@ import redis import time -from ._internal import * +from ._internal import ( + g, + gen_id, + gen_unique_id, + get_timestamp, + import_attribute, + serialize_func_name, + serialize_retry_method, + ACTIVE, + ERROR, + QUEUED, + SCHEDULED, +) from .exceptions import TaskNotFound __all__ = ['Task'] @@ -206,24 +218,24 @@ def _move(self, from_state=None, to_state=None, when=None, mode=None): when = time.time() if mode: scripts.zadd(_key(to_state, queue), when, self.id, - mode, client=pipeline) + mode, client=pipeline) else: pipeline.zadd(_key(to_state, queue), self.id, when) pipeline.sadd(_key(to_state), queue) pipeline.zrem(_key(from_state, queue), self.id) - if not to_state: # Remove the task if necessary + if not to_state: # Remove the task if necessary if self.unique: # Only delete if it's not in any other queue check_states = set([ACTIVE, QUEUED, ERROR, SCHEDULED]) check_states.remove(from_state) # TODO: Do the following two in one call. scripts.delete_if_not_in_zsets(_key('task', self.id, 'executions'), - self.id, [ + self.id, [ _key(state, queue) for state in check_states ], client=pipeline) scripts.delete_if_not_in_zsets(_key('task', self.id), - self.id, [ + self.id, [ _key(state, queue) for state in check_states ], client=pipeline) else: @@ -366,7 +378,7 @@ def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, key = tiger._key(state, queue) pipeline = tiger.connection.pipeline() pipeline.zcard(key) - pipeline.zrange(key, -limit-skip, -1-skip, withscores=True) + pipeline.zrange(key, -limit - skip, -1 - skip, withscores=True) n, items = pipeline.execute() tasks = [] diff --git a/tasktiger/worker.py b/tasktiger/worker.py index 66254cca..d5ae99b1 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -14,20 +14,36 @@ from .redis_lock import Lock -from ._internal import * +from ._internal import ( + dotted_parts, + g, + g_fork_lock, + gen_unique_id, + import_attribute, + reversed_dotted_parts, + serialize_retry_method, + serialize_func_name, + ACTIVE, + ERROR, + QUEUED, + SCHEDULED, + TaskImportError, +) from .exceptions import RetryException, TaskNotFound -from .retry import * +from .retry import StopRetry from .stats import StatsThread from .task import Task from .timeouts import UnixSignalDeathPenalty, JobTimeoutException __all__ = ['Worker'] + def sigchld_handler(*args): # Nothing to do here. This is just a dummy handler that we set up to catch # the child process exiting. pass + class Worker(object): def __init__(self, tiger, queues=None, exclude_queues=None, single_worker_queues=None): @@ -177,14 +193,16 @@ def _wait_for_new_tasks(self, timeout=0, batch_timeout=0): message = self._pubsub.get_message() if self._did_work: - break # Exit immediately if we did work during the last - # execution loop because there might be more work to do + # Exit immediately if we did work during the last + # execution loop because there might be more work to do + break elif time.time() >= batch_exit and new_queue_found: - break # After finding a new queue we can wait until the - # batch timeout expires + # After finding a new queue we can wait until the + # batch timeout expires + break elif time.time() - start_time > timeout: - break # Always exit after our maximum wait time - + # Always exit after our maximum wait time + break def _worker_queue_expired_tasks(self): """ @@ -287,8 +305,7 @@ def _execute_forked(self, tasks, log): 'kwargs': task.kwargs, } for task in tasks] task_timeouts = [task.hard_timeout for task in tasks if task.hard_timeout is not None] - hard_timeout = ((max(task_timeouts) if task_timeouts else None) - or + hard_timeout = ((max(task_timeouts) if task_timeouts else None) or getattr(func, '_task_hard_timeout', None) or self.config['DEFAULT_HARD_TIMEOUT']) @@ -324,7 +341,7 @@ def _execute_forked(self, tasks, log): execution['time_failed'] = time.time() # Currently we only log failed task executions to Redis. execution['traceback'] = \ - ''.join(traceback.format_exception(*exc_info)) + ''.join(traceback.format_exception(*exc_info)) execution['success'] = success execution['host'] = socket.gethostname() serialized_execution = json.dumps(execution) @@ -460,7 +477,7 @@ def check_child_exit(): """ try: pid, return_code = os.waitpid(child_pid, os.WNOHANG) - if pid != 0: # The child process is done. + if pid != 0: # The child process is done. return return_code except OSError as e: # Of course EINTR can happen if the child process exits @@ -746,8 +763,8 @@ def _mark_done(): retry_func, retry_args = execution['retry_method'] else: # We expect the serialized method here. - retry_func, retry_args = serialize_retry_method( \ - self.config['DEFAULT_RETRY_METHOD']) + retry_func, retry_args = serialize_retry_method( + self.config['DEFAULT_RETRY_METHOD']) should_log_error = execution['log_error'] should_retry = True @@ -905,8 +922,8 @@ def run(self, once=False, force_once=False): """ self.log.info('ready', queues=sorted(self.only_queues), - exclude_queues=sorted(self.exclude_queues), - single_worker_queues=sorted(self.single_worker_queues)) + exclude_queues=sorted(self.exclude_queues), + single_worker_queues=sorted(self.single_worker_queues)) if not self.scripts.can_replicate_commands: # Older Redis versions may create additional overhead when @@ -948,7 +965,7 @@ def run(self, once=False, force_once=False): except KeyboardInterrupt: pass - except Exception as e: + except Exception: self.log.exception(event='exception') raise From 00ba1f88acc4073e1bc3d6cedaad32c7791b0399 Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Mon, 23 Apr 2018 20:53:25 -0700 Subject: [PATCH 3/6] example of task_count_from_queue in readme --- README.rst | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/README.rst b/README.rst index fac12d2d..a9c9cb6c 100644 --- a/README.rst +++ b/README.rst @@ -22,7 +22,7 @@ Features TaskTiger forks a subprocess for each task, This comes with several benefits: Memory leaks caused by tasks are avoided since the subprocess is terminated - when the task is finished. A hard time limit can be set for each task, after + when the task is finished. A hard time limit can be set for each task, after which the task is killed if it hasn't completed. To ensure performance, any necessary Python modules can be preloaded in the parent process. @@ -552,15 +552,16 @@ Each queue can have tasks in the following states: - ``scheduled``: Tasks that are scheduled for later execution. - ``error``: Tasks that failed with an error. -To get a list of all tasks for a given queue and state, use -``Task.tasks_from_queue``. The method gives you back a tuple containing the -total number of tasks in the queue (useful if the tasks are truncated) and a -list of tasks in the queue, latest first. Using the ``skip`` and ``limit`` -keyword arguments, you can fetch arbitrary slices of the queue. If you know the -task ID, you can fetch a given task using ``Task.from_id``. Both methods let -you load tracebacks from failed task executions using the ``load_executions`` -keyword argument, which accepts an integer indicating how many executions -should be loaded. +To get a count of the number of tasks for a given queue and state, use +``Task.count_tasks_from_queue``. To get a list of all tasks for a given queue +and state, use ``Task.tasks_from_queue``. The method gives you back a tuple +containing the total number of tasks in the queue (useful if the tasks are +truncated) and a list of tasks in the queue, latest first. Using the ``skip`` +and ``limit`` keyword arguments, you can fetch arbitrary slices of the queue. +If you know the task ID, you can fetch a given task using ``Task.from_id``. +Both methods let you load tracebacks from failed task executions using the +``load_executions`` keyword argument, which accepts an integer indicating how +many executions should be loaded. Tasks can also be constructed and queued using the regular constructor, which takes the TaskTiger instance, the function name and the options described in @@ -661,6 +662,25 @@ executed. for task in tiger.current_tasks: print(task.n_executions()) +Example 4: Printing the number of queued tasks for the default queue. + +.. code:: python + + from tasktiger import TaskTiger, Task + + QUEUE_NAME = 'default' + TASK_STATE = 'queued' + + tiger = TaskTiger() + + count = Task.task_count_from_queue(tiger, QUEUE_NAME, TASK_STATE) + + print('{state} tasks in {queue}: {count}'.format( + state=TASK_STATE.title(), + queue=QUEUE_NAME, + count=count, + ) + Rollbar error handling ---------------------- From 651a042e72e533dbafde4f778f89d81ca4e6371d Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Mon, 23 Apr 2018 20:53:33 -0700 Subject: [PATCH 4/6] tests for new helper method --- tests/test_base.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/tests/test_base.py b/tests/test_base.py index fba24f27..f7d92a9d 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -764,7 +764,7 @@ def test_tasks_from_queue(self): def test_tasks_from_queue_with_executions(self): task = self.tiger.delay(exception_task, retry=True) - + # Get two executions in task Worker(self.tiger).run(once=True) time.sleep(DELAY) @@ -782,6 +782,23 @@ def test_tasks_from_queue_with_executions(self): assert n == 1 assert len(tasks[0].executions) == 2 + def test_task_count_from_queue(self): + task0 = Task(self.tiger, simple_task) + task1 = Task(self.tiger, exception_task) + task2 = Task(self.tiger, simple_task, queue='other') + + n = Task.task_count_from_queue(self.tiger, 'default', 'queued') + assert n == 0 + + task0.delay() + task1.delay() + task2.delay() + + n = Task.task_count_from_queue(self.tiger, 'default', 'queued') + assert n == 2 + n = Task.task_count_from_queue(self.tiger, 'other', 'queued') + assert n == 1 + def test_eager(self): self.tiger.config['ALWAYS_EAGER'] = True From debbcb218bd10adf05f5290cfceb1df2fba2ec64 Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Tue, 24 Apr 2018 08:16:11 -0700 Subject: [PATCH 5/6] Task.queue_metrics helper classmethod --- README.rst | 12 ++++++++++++ tasktiger/task.py | 42 ++++++++++++++++++++++++++++++++++++++++++ tests/test_base.py | 15 ++++++++++++++- 3 files changed, 68 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index a9c9cb6c..e21a57d4 100644 --- a/README.rst +++ b/README.rst @@ -681,6 +681,18 @@ Example 4: Printing the number of queued tasks for the default queue. count=count, ) +Example 5: Printing all queues with task metrics. + +.. code:: python + + from tasktiger import TaskTiger, Task + + tiger = TaskTiger() + + metrics = Task.queue_metrics(tiger) + + print(metrics) + Rollbar error handling ---------------------- diff --git a/tasktiger/task.py b/tasktiger/task.py index 8e8320d3..940ae22d 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -363,6 +363,48 @@ def from_id(self, tiger, queue, state, task_id, load_executions=0): task_id )) + @classmethod + def queue_metrics(self, tiger): + """ + Returns a dict of queue metrics. + + For ex: + { + 'active': { + 'default': { + 'total': 3, + }, + }, + 'error': {}, + 'queued': { + 'default': { + 'total': 10, + }, + 'other': { + 'total': 42, + }, + }, + 'scheduled': {}, + } + """ + + metrics = { + 'active': {}, + 'error': {}, + 'queued': {}, + 'scheduled': {}, + } + prefix = tiger.config['REDIS_PREFIX'] + ':' + + for state in metrics.keys(): + queues = tiger.connection.smembers(prefix + state) + for queue in queues: + metrics[state][queue] = { + 'total': self.task_count_from_queue(tiger, queue, state), + } + + return metrics + @classmethod def tasks_from_queue(self, tiger, queue, state, skip=0, limit=1000, load_executions=0): diff --git a/tests/test_base.py b/tests/test_base.py index f7d92a9d..47f5859f 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -763,7 +763,7 @@ def test_tasks_from_queue(self): assert task0.queue == 'default' def test_tasks_from_queue_with_executions(self): - task = self.tiger.delay(exception_task, retry=True) + self.tiger.delay(exception_task, retry=True) # Get two executions in task Worker(self.tiger).run(once=True) @@ -799,6 +799,19 @@ def test_task_count_from_queue(self): n = Task.task_count_from_queue(self.tiger, 'other', 'queued') assert n == 1 + def test_queue_metrics(self): + task0 = Task(self.tiger, simple_task) + task1 = Task(self.tiger, exception_task) + task2 = Task(self.tiger, simple_task, queue='other') + + task0.delay() + task1.delay() + task2.delay() + + metrics = Task.queue_metrics(self.tiger) + assert metrics['queued']['default']['total'] == 2 + assert metrics['queued']['other']['total'] == 1 + def test_eager(self): self.tiger.config['ALWAYS_EAGER'] = True From 843e0165f2379780a583b438bc7fdb10498851ad Mon Sep 17 00:00:00 2001 From: Alan Hamlett Date: Tue, 24 Apr 2018 16:02:37 -0700 Subject: [PATCH 6/6] fix readme typo --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index e21a57d4..1e131337 100644 --- a/README.rst +++ b/README.rst @@ -553,7 +553,7 @@ Each queue can have tasks in the following states: - ``error``: Tasks that failed with an error. To get a count of the number of tasks for a given queue and state, use -``Task.count_tasks_from_queue``. To get a list of all tasks for a given queue +``Task.task_count_from_queue``. To get a list of all tasks for a given queue and state, use ``Task.tasks_from_queue``. The method gives you back a tuple containing the total number of tasks in the queue (useful if the tasks are truncated) and a list of tasks in the queue, latest first. Using the ``skip``