diff --git a/requirements.txt b/requirements.txt index 2abcd235..6f52c0ab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ python-dateutil requests_oauthlib +tqdm +humanize click click-plugins click-config-file diff --git a/twarc/client2.py b/twarc/client2.py index 907702c3..5bcbc5d3 100644 --- a/twarc/client2.py +++ b/twarc/client2.py @@ -5,7 +5,6 @@ """ import re -import ssl import json import time import logging @@ -23,8 +22,6 @@ log = logging.getLogger("twarc") -TWITTER_EPOCH = datetime.datetime(2006, 3, 21, tzinfo=datetime.timezone.utc) - class Twarc2: """ @@ -227,12 +224,6 @@ def search_all( """ url = "https://api.twitter.com/2/tweets/search/all" - # start time defaults to the beginning of Twitter to override the - # default of the last month. Only do this if start_time is not already - # specified and since_id isn't being used - if start_time is None and since_id is None: - start_time = TWITTER_EPOCH - return self._search( url, query, @@ -319,12 +310,6 @@ def counts_all( """ url = "https://api.twitter.com/2/tweets/counts/all" - # start time defaults to the beginning of Twitter to override the - # default of the last month. Only do this if start_time is not already - # specified and since_id isn't being used - if start_time is None and since_id is None: - start_time = TWITTER_EPOCH - return self._search( url, query, @@ -713,7 +698,7 @@ def mentions( exclude_replies, ) - def following(self, user): + def following(self, user, user_id=None): """ Retrieve the user profiles of accounts followed by the given user. @@ -725,13 +710,13 @@ def following(self, user): Returns: generator[dict]: A generator, dict for each page of results. """ - user_id = self._ensure_user_id(user) + user_id = self._ensure_user_id(user) if not user_id else user_id params = expansions.USER_EVERYTHING.copy() params["max_results"] = 1000 url = f"https://api.twitter.com/2/users/{user_id}/following" return self.get_paginated(url, params=params) - def followers(self, user): + def followers(self, user, user_id=None): """ Retrieve the user profiles of accounts following the given user. @@ -743,7 +728,7 @@ def followers(self, user): Returns: generator[dict]: A generator, dict for each page of results. """ - user_id = self._ensure_user_id(user) + user_id = self._ensure_user_id(user) if not user_id else user_id params = expansions.USER_EVERYTHING.copy() params["max_results"] = 1000 url = f"https://api.twitter.com/2/users/{user_id}/followers" @@ -875,18 +860,24 @@ def connect(self): resource_owner_secret=self.access_token_secret, ) + def _id_exists(self, user): + """ + Returns True if the user id exists + """ + try: + error_name = next(self.user_lookup([user]))["errors"][0]["title"] + return error_name != "Not Found Error" + except KeyError: + return True + def _ensure_user_id(self, user): + """ + Always return a valid user id, look up if not numeric. + """ user = str(user) is_numeric = re.match(r"^\d+$", user) - def id_exists(user): - try: - error_name = next(self.user_lookup([user]))["errors"][0]["title"] - return error_name != "Not Found Error" - except KeyError: - return True - - if len(user) > 15 or (is_numeric and id_exists(user)): + if len(user) > 15 or (is_numeric and self._id_exists(user)): return user else: results = next(self.user_lookup([user], usernames=True)) @@ -897,6 +888,25 @@ def id_exists(user): else: raise ValueError(f"No such user {user}") + def _ensure_user(self, user): + """ + Always return a valid user object. + """ + user = str(user) + is_numeric = re.match(r"^\d+$", user) + + lookup = [] + if len(user) > 15 or (is_numeric and self._id_exists(user)): + lookup = expansions.ensure_flattened(list(self.user_lookup([user]))) + else: + lookup = expansions.ensure_flattened( + list(self.user_lookup([user], usernames=True)) + ) + if lookup: + return lookup[-1] + else: + raise ValueError(f"No such user {user}") + def _check_for_disconnect(self, data): """ Look for disconnect errors in a response, and reconnect if found. The diff --git a/twarc/command2.py b/twarc/command2.py index 9eccdea2..c4f31132 100644 --- a/twarc/command2.py +++ b/twarc/command2.py @@ -2,7 +2,6 @@ The command line interfact to the Twitter v2 API. """ -import os import re import json import twarc @@ -10,19 +9,27 @@ import logging import pathlib import datetime -import requests import configobj import threading +from tqdm.auto import tqdm +from datetime import timezone from click_plugins import with_plugins from pkg_resources import iter_entry_points from twarc.version import version from twarc.handshake import handshake from twarc.config import ConfigProvider -from twarc.decorators2 import cli_api_error from twarc.expansions import ensure_flattened from click_config_file import configuration_option +from twarc.decorators2 import ( + cli_api_error, + TimestampProgressBar, + FileSizeProgressBar, + _millis2snowflake, + _date2millis, +) + config_provider = ConfigProvider() log = logging.getLogger("twarc") @@ -197,6 +204,68 @@ def get_version(): click.echo(f"twarc v{version}") +def _search( + T, + query, + outfile, + since_id, + until_id, + start_time, + end_time, + limit, + max_results, + archive, + hide_progress, +): + """ + Search for tweets. + """ + count = 0 + + # Make sure times are always in UTC, click sometimes doesn't add timezone: + if start_time is not None and start_time.tzinfo is None: + start_time = start_time.replace(tzinfo=timezone.utc) + if end_time is not None and end_time.tzinfo is None: + end_time = end_time.replace(tzinfo=timezone.utc) + + if archive: + search_method = T.search_all + + # default number of tweets per response 500 when not set otherwise + if max_results == 0: + max_results = 100 # temp fix for #504 + + # start time defaults to the beginning of Twitter to override the + # default of the last month. Only do this if start_time is not already + # specified and since_id isn't being used + if start_time is None and since_id is None: + start_time = datetime.datetime(2006, 3, 21, tzinfo=datetime.timezone.utc) + else: + if max_results == 0: + max_results = 100 + search_method = T.search_recent + + hide_progress = True if (outfile.name == "") else hide_progress + + with TimestampProgressBar( + since_id, until_id, start_time, end_time, disable=hide_progress + ) as progress: + for result in search_method( + query, since_id, until_id, start_time, end_time, max_results + ): + _write(result, outfile) + tweet_ids = [t["id"] for t in result.get("data", [])] + log.info("archived %s", ",".join(tweet_ids)) + progress.update_with_result(result) + count += len(result["data"]) + if limit != 0 and count >= limit: + # Display message when stopped early + progress.desc = f"Set --limit of {limit} reached" + break + else: + progress.early_stop = False + + @twarc2.command("search") @click.option("--since-id", type=int, help="Match tweets sent after tweet id") @click.option("--until-id", type=int, help="Match tweets sent prior to tweet id") @@ -214,12 +283,18 @@ def get_version(): "--archive", is_flag=True, default=False, - help="Search the full archive (requires Academic Research track)", + help="Search the full archive (requires Academic Research track). Defaults to searching the entire twitter archive if --start-time is not specified.", ) @click.option("--limit", default=0, help="Maximum number of tweets to save") @click.option( "--max-results", default=0, help="Maximum number of tweets per API response" ) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @click.argument("query", type=str) @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @@ -235,34 +310,24 @@ def search( limit, max_results, archive, + hide_progress, ): """ Search for tweets. """ - count = 0 - - if archive: - search_method = T.search_all - - # default number of tweets per response 500 when not set otherwise - if max_results == 0: - max_results = 100 # temp fix for #504 - else: - if max_results == 0: - max_results = 100 - search_method = T.search_recent - - for result in search_method( - query, since_id, until_id, start_time, end_time, max_results - ): - _write(result, outfile) - - tweet_ids = [t["id"] for t in result.get("data", [])] - log.info("archived %s", ",".join(tweet_ids)) - - count += len(result["data"]) - if limit != 0 and count >= limit: - break + return _search( + T, + query, + outfile, + since_id, + until_id, + start_time, + end_time, + limit, + max_results, + archive, + hide_progress, + ) @twarc2.command("counts") @@ -382,41 +447,85 @@ def tweet(T, tweet_id, outfile, pretty): @twarc2.command("followers") -@click.option("--limit", default=0, help="Maximum number of followers to save") +@click.option( + "--limit", + default=0, + help="Maximum number of followers to save. Increments of 1000.", +) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress", +) @click.argument("user", type=str) @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @cli_api_error -def followers(T, user, outfile, limit): +def followers(T, user, outfile, limit, hide_progress): """ Get the followers for a given user. """ count = 0 + user_id = None + lookup_total = 0 - for result in T.followers(user): - _write(result, outfile) - count += len(result["data"]) - if limit != 0 and count >= limit: - break + if outfile is not None and (outfile.name == ""): + hide_progress = True + + if not hide_progress: + target_user = T._ensure_user(user) + user_id = target_user["id"] + lookup_total = target_user["public_metrics"]["followers_count"] + + with tqdm(disable=hide_progress, total=lookup_total) as progress: + for result in T.followers(user, user_id=user_id): + _write(result, outfile) + count += len(result["data"]) + progress.update(len(result["data"])) + if limit != 0 and count >= limit: + progress.desc = f"Set --limit of {limit} reached" + break @twarc2.command("following") -@click.option("--limit", default=0, help="Maximum number of friends to save") +@click.option( + "--limit", default=0, help="Maximum number of friends to save. Increments of 1000." +) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress", +) @click.argument("user", type=str) @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @cli_api_error -def following(T, user, outfile, limit): +def following(T, user, outfile, limit, hide_progress): """ Get the users who are following a given user. """ count = 0 + user_id = None + lookup_total = 0 - for result in T.following(user): - _write(result, outfile) - count += len(result["data"]) - if limit != 0 and count >= limit: - break + if outfile is not None and (outfile.name == ""): + hide_progress = True + + if not hide_progress: + target_user = T._ensure_user(user) + user_id = target_user["id"] + lookup_total = target_user["public_metrics"]["following_count"] + + with tqdm(disable=hide_progress, total=lookup_total) as progress: + for result in T.following(user, user_id=user_id): + _write(result, outfile) + count += len(result["data"]) + progress.update(len(result["data"])) + if limit != 0 and count >= limit: + progress.desc = f"Set --limit of {limit} reached" + break @twarc2.command("sample") @@ -448,30 +557,54 @@ def sample(T, outfile, limit): @twarc2.command("hydrate") @click.argument("infile", type=click.File("r"), default="-") @click.argument("outfile", type=click.File("w"), default="-") +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @click.pass_obj @cli_api_error -def hydrate(T, infile, outfile): +def hydrate(T, infile, outfile, hide_progress): """ Hydrate tweet ids. """ - for result in T.tweet_lookup(infile): - _write(result, outfile) - tweet_ids = [t["id"] for t in result.get("data", [])] - log.info("archived %s", ",".join(tweet_ids)) + with FileSizeProgressBar(infile, outfile, disable=hide_progress) as progress: + for result in T.tweet_lookup(infile): + _write(result, outfile) + tweet_ids = [t["id"] for t in result.get("data", [])] + log.info("archived %s", ",".join(tweet_ids)) + progress.update_with_result(result, error_resource_type="tweet") @twarc2.command("users") -@click.option("--usernames", is_flag=True, default=False) @click.argument("infile", type=click.File("r"), default="-") @click.argument("outfile", type=click.File("w"), default="-") +@click.option("--usernames", is_flag=True, default=False) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @click.pass_obj @cli_api_error -def users(T, infile, outfile, usernames): +def users(T, infile, outfile, usernames, hide_progress): """ Get data for user ids or usernames. """ - for result in T.user_lookup(infile, usernames): - _write(result, outfile) + with FileSizeProgressBar(infile, outfile, disable=hide_progress) as progress: + for result in T.user_lookup(infile, usernames): + _write(result, outfile) + if usernames: + progress.update_with_result( + result, + field="username", + error_resource_type="user", + error_parameter="usernames", + ) + else: + progress.update_with_result(result, error_resource_type="user") @twarc2.command("mentions") @@ -487,16 +620,33 @@ def users(T, infile, outfile, usernames): type=click.DateTime(formats=("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S")), help="Match tweets sent before time (ISO 8601/RFC 3339)", ) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress", +) @click.argument("user_id", type=str) @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @cli_api_error -def mentions(T, user_id, outfile, since_id, until_id, start_time, end_time): +def mentions( + T, user_id, outfile, since_id, until_id, start_time, end_time, hide_progress +): """ - Retrieve the most recent tweets mentioning the given user. + Retrieve max of 800 of the most recent tweets mentioning the given user. """ - for result in T.mentions(user_id, since_id, until_id, start_time, end_time): - _write(result, outfile) + + with tqdm(disable=hide_progress, total=800) as progress: + for result in T.mentions(user_id, since_id, until_id, start_time, end_time): + _write(result, outfile) + progress.update(len(result["data"])) + else: + if progress.n > 800: + progress.desc = f"API limit reached with {progress.n} tweets" + progress.n = 800 + else: + progress.desc = f"Set limit reached with {progress.n} tweets" @twarc2.command("timeline") @@ -531,6 +681,12 @@ def mentions(T, user_id, outfile, since_id, until_id, start_time, end_time): default=False, help="Use the search/all API endpoint which is not limited to the last 3200 tweets, but requires Academic Product Track access.", ) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @click.argument("user_id", type=str) @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @@ -547,11 +703,50 @@ def timeline( limit, exclude_retweets, exclude_replies, + hide_progress, ): """ Retrieve recent tweets for the given user. """ + count = 0 + user = T._ensure_user(user_id) # It's possible to skip this to optimize more + + if use_search or (start_time or end_time) or (since_id or until_id): + pbar = TimestampProgressBar + + # Infer start time as the user created time if not using ids + if start_time is None and (since_id is None and until_id is None): + start_time = datetime.datetime.strptime( + user["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + # Infer since_id as user created time if using ids + if start_time is None and since_id is None: + infer_id = _millis2snowflake( + _date2millis( + datetime.datetime.strptime( + user["created_at"], "%Y-%m-%dT%H:%M:%S.%fZ" + ) + ) + ) + # Snowflake epoch is 1288834974657 so if older, just set it to "1" + since_id = infer_id if infer_id > 0 else 1 + + pbar_params = { + "since_id": since_id, + "until_id": until_id, + "start_time": start_time, + "end_time": end_time, + "disable": hide_progress, + } + + else: + pbar = tqdm + pbar_params = { + "disable": hide_progress, + "total": user["public_metrics"]["tweet_count"], + } + tweets = _timeline_tweets( T, use_search, @@ -564,13 +759,25 @@ def timeline( exclude_replies, ) - count = 0 - for result in tweets: - _write(result, outfile) + with pbar(**pbar_params) as progress: + for result in tweets: + _write(result, outfile) - count += len(result["data"]) - if limit != 0 and count >= limit: - break + count += len(result["data"]) + if isinstance(progress, TimestampProgressBar): + progress.update_with_result(result) + else: + progress.update(len(result["data"])) + + if limit != 0 and count >= limit: + # Display message when stopped early + progress.desc = f"Set --limit of {limit} reached" + break + else: + if isinstance(progress, TimestampProgressBar): + progress.early_stop = False + if not use_search and user["public_metrics"]["tweet_count"] > 3200: + progress.desc = f"API limit of 3200 reached" @twarc2.command("timelines") @@ -598,6 +805,12 @@ def timeline( default=False, help="Exclude replies from timeline", ) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @click.argument("infile", type=click.File("r"), default="-") @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @@ -610,6 +823,7 @@ def timelines( use_search, exclude_retweets, exclude_replies, + hide_progress, ): """ Fetch the timelines of every user in an input source of tweets. If @@ -625,72 +839,79 @@ def timelines( total_count = 0 line_count = 0 seen = set() - for line in infile: - line_count += 1 - line = line.strip() - if line == "": - log.warn("skipping blank line on line %s", line_count) - continue - - users = None - try: - # first try to get user ids from a flattened Twitter response - json_data = json.loads(line) - try: - users = set([t["author"]["id"] for t in ensure_flattened(json_data)]) - except (KeyError, ValueError): - # if it's not tweet JSON but it parsed as a string use that as a user - if isinstance(json_data, str) and json_data: - users = set([json_data]) - else: - log.warn("ignored line %s which didn't contain users", line_count) - continue - except json.JSONDecodeError: - # assume it's a single user - users = set([line]) + with FileSizeProgressBar(infile, outfile, disable=hide_progress) as progress: + for line in infile: + progress.update(len(line)) + line_count += 1 + line = line.strip() + if line == "": + log.warn("skipping blank line on line %s", line_count) + continue + + users = None + try: + # first try to get user ids from a flattened Twitter response + json_data = json.loads(line) + try: + users = set( + [t["author"]["id"] for t in ensure_flattened(json_data)] + ) + except (KeyError, ValueError): + # if it's not tweet JSON but it parsed as a string use that as a user + if isinstance(json_data, str) and json_data: + users = set([json_data]) + else: + log.warn( + "ignored line %s which didn't contain users", line_count + ) + continue + + except json.JSONDecodeError: + # assume it's a single user + users = set([line]) + + if users is None: + click.echo( + click.style( + f"unable to find user or users on line {line_count}", + fg="red", + ), + err=True, + ) + break - if users is None: - click.echo( - click.style( - f"unable to find user or users on line {line_count}", - fg="red", - ), - err=True, - ) - break + for user in users: - for user in users: + # only process a given user once + if user in seen: + log.info("already processed %s, skipping", user) + continue + seen.add(user) - # only process a given user once - if user in seen: - log.info("already processed %s, skipping", user) - continue - seen.add(user) - - tweets = _timeline_tweets( - T, - use_search, - user, - None, - None, - None, - None, - exclude_retweets, - exclude_replies, - ) + tweets = _timeline_tweets( + T, + use_search, + user, + None, + None, + None, + None, + exclude_retweets, + exclude_replies, + ) - timeline_count = 0 - for response in tweets: - _write(response, outfile) + timeline_count = 0 + for response in tweets: + _write(response, outfile) - timeline_count += len(response["data"]) - if timeline_limit != 0 and timeline_count >= timeline_limit: - break + timeline_count += len(response["data"]) + if timeline_limit != 0 and timeline_count >= timeline_limit: + break - total_count += len(response["data"]) - if limit != 0 and total_count >= limit: - return + total_count += len(response["data"]) + if limit != 0 and total_count >= limit: + return def _timeline_tweets( @@ -710,7 +931,7 @@ def _timeline_tweets( q += " -is:retweet" if exclude_replies and "-is:reply" not in q: q += " -is:reply" - tweets = T.search_all(q, since_id, until_id, start_time, end_time) + tweets = T.search_all(q, since_id, until_id, start_time, end_time, 100) else: tweets = T.timeline( user_id, @@ -725,27 +946,68 @@ def _timeline_tweets( @twarc2.command("conversation") +@click.option("--since-id", type=int, help="Match tweets sent after tweet id") +@click.option("--until-id", type=int, help="Match tweets sent prior to tweet id") +@click.option( + "--start-time", + type=click.DateTime(formats=("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S")), + help="Match tweets created after UTC time (ISO 8601/RFC 3339), e.g. 2021-01-01T12:31:04", +) +@click.option( + "--end-time", + type=click.DateTime(formats=("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S")), + help="Match tweets sent before UTC time (ISO 8601/RFC 3339)", +) @click.option( "--archive", is_flag=True, default=False, help="Search the full archive (requires Academic Research track)", ) +@click.option("--limit", default=0, help="Maximum number of tweets to save") +@click.option( + "--max-results", default=0, help="Maximum number of tweets per API response" +) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @click.argument("tweet_id", type=str) @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @cli_api_error -def conversation(T, tweet_id, archive, outfile): +def conversation( + T, + tweet_id, + outfile, + since_id, + until_id, + start_time, + end_time, + limit, + max_results, + archive, + hide_progress, +): """ Retrieve a conversation thread using the tweet id. """ q = f"conversation_id:{tweet_id}" - if archive: - search = T.search_all(q) - else: - search = T.search_recent(q) - for resp in search: - _write(resp, outfile) + return _search( + T, + q, + outfile, + since_id, + until_id, + start_time, + end_time, + limit, + max_results, + archive, + hide_progress, + ) @twarc2.command("conversations") @@ -761,11 +1023,19 @@ def conversation(T, tweet_id, archive, outfile): default=False, help="Use the Academic Research project track access to the full archive", ) +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @click.argument("infile", type=click.File("r"), default="-") @click.argument("outfile", type=click.File("w"), default="-") @click.pass_obj @cli_api_error -def conversations(T, infile, outfile, archive, limit, conversation_limit): +def conversations( + T, infile, outfile, archive, limit, conversation_limit, hide_progress +): """ Fetch the full conversation threads that the input tweets are a part of. Alternatively the input can be a line oriented file of conversation ids. @@ -780,61 +1050,70 @@ def conversations(T, infile, outfile, archive, limit, conversation_limit): count = 0 stop = False - for line in infile: - conv_ids = [] - # stop will get set when the total tweet limit has been met - if stop: - break + with FileSizeProgressBar(infile, outfile, disable=hide_progress) as progress: + for line in infile: + progress.update(len(line)) + conv_ids = [] - # get a specific conversation id - line = line.strip() - if re.match(r"^\d+$", line): - if line in seen: - continue - conv_ids = [line] + # stop will get set when the total tweet limit has been met + if stop: + break - # generate all conversation_ids that are referenced in tweets input - else: + # get a specific conversation id + line = line.strip() + if re.match(r"^\d+$", line): + if line in seen: + continue + conv_ids = [line] - def f(): - for tweet in ensure_flattened(json.loads(line)): - yield tweet.get("conversation_id") + # generate all conversation_ids that are referenced in tweets input + else: - conv_ids = f() + def f(): + for tweet in ensure_flattened(json.loads(line)): + yield tweet.get("conversation_id") - # output results while paying attention to the set limits - conv_count = 0 + conv_ids = f() - for conv_id in conv_ids: + # output results while paying attention to the set limits + conv_count = 0 - if conv_id in seen: - log.info(f"already fetched conversation_id {conv_id}") - seen.add(conv_id) + for conv_id in conv_ids: - conv_count = 0 + if conv_id in seen: + log.info(f"already fetched conversation_id {conv_id}") + seen.add(conv_id) + + conv_count = 0 - log.info(f"fetching conversation {conv_id}") - for result in search(f"conversation_id:{conv_id}"): - _write(result, outfile, False) + log.info(f"fetching conversation {conv_id}") + for result in search(f"conversation_id:{conv_id}"): + _write(result, outfile, False) - count += len(result["data"]) - if limit != 0 and count >= limit: - log.info(f"reached tweet limit of {limit}") - stop = True - break + count += len(result["data"]) + if limit != 0 and count >= limit: + log.info(f"reached tweet limit of {limit}") + stop = True + break - conv_count += len(result["data"]) - if conversation_limit != 0 and conv_count >= conversation_limit: - log.info(f"reached conversation limit {conversation_limit}") - break + conv_count += len(result["data"]) + if conversation_limit != 0 and conv_count >= conversation_limit: + log.info(f"reached conversation limit {conversation_limit}") + break @twarc2.command("flatten") @click.argument("infile", type=click.File("r"), default="-") @click.argument("outfile", type=click.File("w"), default="-") +@click.option( + "--hide-progress", + is_flag=True, + default=False, + help="Hide the Progress bar. Default: show progress, unless using pipes.", +) @cli_api_error -def flatten(infile, outfile): +def flatten(infile, outfile, hide_progress): """ "Flatten" tweets, or move expansions inline with tweet objects and ensure that each line of output is a single tweet. @@ -849,9 +1128,11 @@ def flatten(infile, outfile): ) return - for line in infile: - for tweet in ensure_flattened(json.loads(line)): - _write(tweet, outfile, False) + with FileSizeProgressBar(infile, outfile, disable=hide_progress) as progress: + for line in infile: + for tweet in ensure_flattened(json.loads(line)): + _write(tweet, outfile, False) + progress.update(len(line)) @twarc2.command("stream") diff --git a/twarc/decorators2.py b/twarc/decorators2.py index 26630f35..edf41956 100644 --- a/twarc/decorators2.py +++ b/twarc/decorators2.py @@ -1,11 +1,15 @@ +import os import time import click -import types import logging import requests +import datetime +import humanize +from tqdm.auto import tqdm from functools import wraps + log = logging.getLogger("twarc") @@ -165,3 +169,151 @@ class InvalidAuthType(Exception): """ Raised when the endpoint called is not supported by the current auth type. """ + + +class FileSizeProgressBar(tqdm): + """ + An input file size based progress bar. Counts an input file in bytes. + This will also dig into the responses and add up the outputs to match the file size. + Overrides `disable` parameter if file is a pipe. + """ + + def __init__(self, infile, outfile, **kwargs): + disable = False if "disable" not in kwargs else kwargs["disable"] + if infile is not None and (infile.name == ""): + disable = True + if outfile is not None and (outfile.name == ""): + disable = True + kwargs["disable"] = disable + kwargs["unit"] = "B" + kwargs["unit_scale"] = True + kwargs["unit_divisor"] = 1024 + kwargs["miniters"] = 1 + kwargs[ + "bar_format" + ] = "{l_bar}{bar}| Processed {n_fmt}/{total_fmt} of input file [{elapsed}<{remaining}, {rate_fmt}{postfix}]" + kwargs["total"] = os.stat(infile.name).st_size if not disable else 1 + super().__init__(**kwargs) + + def update_with_result( + self, result, field="id", error_resource_type=None, error_parameter="ids" + ): + try: + for item in result["data"]: + # Use the length of the id / name and a newline to match original file + self.update(len(item[field]) + len("\n")) + if error_resource_type and "errors" in result: + for error in result["errors"]: + # Account for deleted data + # Errors have very inconsistent format, missing fields for different types of errors... + if ( + "resource_type" in error + and error["resource_type"] == error_resource_type + ): + if ( + "parameter" in error + and error["parameter"] == error_parameter + ): + self.update(len(error["value"]) + len("\n")) + # todo: hide or show this? + # self.set_description( + # "Errors encountered, results may be incomplete" + # ) + # print(error["value"], error["resource_type"], error["parameter"]) + except Exception as e: + log.error(f"Failed to update progress bar: {e}") + + +class TimestampProgressBar(tqdm): + """ + A Timestamp based progress bar. Counts timestamp ranges in milliseconds. + This can be used to display a progress bar for tweet ids and time ranges. + """ + + def __init__(self, since_id, until_id, start_time, end_time, **kwargs): + self.early_stop = True + self.tweet_count = 0 + + disable = False if "disable" not in kwargs else kwargs["disable"] + kwargs["disable"] = disable + + if start_time is None and (since_id is None and until_id is None): + start_time = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta(days=7) + if end_time is None and (since_id is None and until_id is None): + end_time = datetime.datetime.now( + datetime.timezone.utc + ) - datetime.timedelta(seconds=30) + + if since_id and not until_id: + until_id = _millis2snowflake( + _date2millis(datetime.datetime.now(datetime.timezone.utc)) + ) + + if until_id and not since_id: + since_id = 1 + + total = ( + _snowflake2millis(until_id) - _snowflake2millis(since_id) + if (since_id and until_id) + else _date2millis(end_time) - _date2millis(start_time) + ) + + kwargs["miniters"] = 1 + kwargs["total"] = total + kwargs[ + "bar_format" + ] = "{l_bar}{bar}| Processed {n_time}/{total_time} [{elapsed}<{remaining}, {tweet_count} tweets total {postfix}]" + super().__init__(**kwargs) + + def update_with_result(self, result): + """ + Update progress bar based on snowflake ids. + """ + try: + newest_id = result["meta"]["newest_id"] + oldest_id = result["meta"]["oldest_id"] + n = _snowflake2millis(int(newest_id)) - _snowflake2millis(int(oldest_id)) + self.update(n) + self.tweet_count += len(result["data"]) + except Exception as e: + log.error(f"Failed to update progress bar: {e}") + + @property + def format_dict(self): + d = super(TimestampProgressBar, self).format_dict # original format dict + tweets_per_second = int(self.tweet_count / d["elapsed"] if d["elapsed"] else 0) + n_time = humanize.naturaldelta(datetime.timedelta(seconds=int(d["n"]) // 1000)) + total_time = humanize.naturaldelta( + datetime.timedelta(seconds=int(d["total"]) // 1000) + ) + d.update(n_time=n_time) + d.update(total_time=total_time) + d.update(tweet_count=self.tweet_count) + d.update(tweets_per_second=tweets_per_second) + return d + + def close(self): + if not self.early_stop: + # Finish the bar to 100% even if the last tweet ids do not cover the full time range + self.update(self.total - self.n) + super().close() + + +def _date2millis(dt): + return int(dt.timestamp() * 1000) + + +def _millis2date(ms): + return datetime.datetime.utcfromtimestamp(ms // 1000).replace( + microsecond=ms % 1000 * 1000 + ) + + +def _snowflake2millis(snowflake_id): + return (snowflake_id >> 22) + 1288834974657 + + +def _millis2snowflake(ms): + return (int(ms) - 1288834974657) << 22