diff --git a/pyproject.toml b/pyproject.toml index cf052bb24e5359..100a2a0486a4b4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,7 +81,7 @@ dependencies = [ "rfc3339-validator>=0.1.2", "rfc3986-validator>=0.1.1", # [end] jsonschema format validators - "sentry-arroyo>=2.38.1", + "sentry-arroyo>=2.38.5", "sentry-conventions>=0.3.0", "sentry-forked-email-reply-parser>=0.5.12.post1", "sentry-kafka-schemas>=2.1.27", diff --git a/src/sentry/monitors/tasks/clock_pulse.py b/src/sentry/monitors/tasks/clock_pulse.py index 9281eed4b0e69d..dc42bdcfb4eb1c 100644 --- a/src/sentry/monitors/tasks/clock_pulse.py +++ b/src/sentry/monitors/tasks/clock_pulse.py @@ -8,7 +8,10 @@ from arroyo import Partition from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload -from confluent_kafka.admin import AdminClient, PartitionMetadata +from confluent_kafka.admin import ( # type: ignore[attr-defined] + AdminClient, + PartitionMetadata, +) from django.conf import settings from sentry_kafka_schemas.codecs import Codec from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import ClockPulse, IngestMonitorMessage diff --git a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py index 307fdfd50ed4f7..5b1042bd4664ae 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py +++ b/src/sentry/sentry_metrics/consumers/indexer/multiprocess.py @@ -93,7 +93,7 @@ def submit(self, message: Message[KafkaPayload | FilteredPayload]) -> None: on_delivery=partial( self.callback, committable=message.committable, timestamp=message.timestamp ), - headers=message.payload.headers, + headers=message.payload.headers, # type: ignore[arg-type] ) def callback( diff --git a/src/sentry/sentry_metrics/consumers/indexer/routing_producer.py b/src/sentry/sentry_metrics/consumers/indexer/routing_producer.py index 9690a923331213..57c2061cbec25f 100644 --- a/src/sentry/sentry_metrics/consumers/indexer/routing_producer.py +++ b/src/sentry/sentry_metrics/consumers/indexer/routing_producer.py @@ -118,7 +118,7 @@ def poll(self) -> None: def __delivery_callback( self, future: Future[str], - error: KafkaError, + error: KafkaError | None, message: ConfluentMessage, ) -> None: if error is not None: @@ -142,14 +142,12 @@ def submit(self, message: Message[RoutingPayload]) -> None: future: Future[str] = Future() future.set_running_or_notify_cancel() - ( - producer.produce( - topic=topic.name, - value=output_message.payload.value, - key=output_message.payload.key, - headers=output_message.payload.headers, - on_delivery=partial(self.__delivery_callback, future), - ), + producer.produce( + topic=topic.name, + value=output_message.payload.value, + key=output_message.payload.key, + headers=output_message.payload.headers, # type: ignore[arg-type] + on_delivery=partial(self.__delivery_callback, future), ) self.__queue.append((output_message.committable, future)) diff --git a/src/sentry/testutils/pytest/kafka.py b/src/sentry/testutils/pytest/kafka.py index 80475030cc8794..f0cd95a6179d31 100644 --- a/src/sentry/testutils/pytest/kafka.py +++ b/src/sentry/testutils/pytest/kafka.py @@ -4,7 +4,8 @@ from collections.abc import MutableMapping import pytest -from confluent_kafka import Consumer, Producer +from arroyo.processing import StreamProcessor +from confluent_kafka import Producer from confluent_kafka.admin import AdminClient from sentry.testutils.pytest import xdist @@ -73,7 +74,7 @@ def scope_consumers(): be created once per test session). """ - all_consumers: MutableMapping[str, Consumer | None] = { + all_consumers: MutableMapping[str, StreamProcessor | None] = { xdist.get_kafka_topic("ingest-events"): None, xdist.get_kafka_topic("outcomes"): None, } diff --git a/src/sentry/utils/batching_kafka_consumer.py b/src/sentry/utils/batching_kafka_consumer.py index 47f0530e32b42c..9442975cdb2b29 100644 --- a/src/sentry/utils/batching_kafka_consumer.py +++ b/src/sentry/utils/batching_kafka_consumer.py @@ -15,7 +15,7 @@ def wait_for_topics(admin_client: AdminClient, topics: list[str], timeout: int = """ for topic in topics: start = time.time() - last_error = None + last_error: str | KafkaError | None = None while True: if time.time() > start + timeout: @@ -25,10 +25,14 @@ def wait_for_topics(admin_client: AdminClient, topics: list[str], timeout: int = result = admin_client.list_topics(topic=topic, timeout=timeout) topic_metadata = result.topics.get(topic) - if topic_metadata and topic_metadata.partitions and not topic_metadata.error: + if topic_metadata is None: + last_error = "Topic metadata not found" + time.sleep(0.1) + continue + if topic_metadata.partitions and not topic_metadata.error: logger.debug("Topic '%s' is ready", topic) break - elif topic_metadata.error in { + elif topic_metadata.error is not None and topic_metadata.error.code() in { KafkaError.UNKNOWN_TOPIC_OR_PART, KafkaError.LEADER_NOT_AVAILABLE, }: diff --git a/tests/sentry/monitors/tasks/test_clock_pulse.py b/tests/sentry/monitors/tasks/test_clock_pulse.py index 9c216c9f897c5c..4ee9c363f5ecfb 100644 --- a/tests/sentry/monitors/tasks/test_clock_pulse.py +++ b/tests/sentry/monitors/tasks/test_clock_pulse.py @@ -3,7 +3,7 @@ from arroyo import Partition, Topic from arroyo.backends.kafka import KafkaPayload -from confluent_kafka.admin import PartitionMetadata +from confluent_kafka.admin import PartitionMetadata # type: ignore[attr-defined] from django.test import override_settings from sentry.monitors.tasks.clock_pulse import MONITOR_CODEC, clock_pulse diff --git a/uv.lock b/uv.lock index 32dabf01919c03..976e09db0e9ab9 100644 --- a/uv.lock +++ b/uv.lock @@ -205,13 +205,15 @@ wheels = [ [[package]] name = "confluent-kafka" -version = "2.8.0" +version = "2.13.2" source = { registry = "https://pypi.devinfra.sentry.io/simple" } wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.8.0-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:dd3bc67d589dd486d128a159e918ecf3765f8154474edf9f6f701f701de735a1" }, - { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.8.0-cp313-cp313-macosx_13_0_x86_64.whl", hash = "sha256:4c0e655df9faef450654700db3fda163ddbc4b68f5bf5c7633cf1bf9d932d892" }, - { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.8.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:abff7c4853e2d118563229329ca0a1f148ee5004cbcb9a8dad9dc8e796fcc477" }, - { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.8.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:e75230b51456de5cfaefe94c35f3de5101864d8c21518f114d5cd9dd1d7d43b1" }, + { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.13.2-cp313-cp313-macosx_13_0_arm64.whl", hash = "sha256:02702808dd3cfd91f117fbf17181da2a95392967e9f946b1cbdc5589b36e39d1" }, + { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.13.2-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:f3e6d010ad38447a48e0f9fab81edd4d2fd0b5f5a79ab475c30347689e35c6e6" }, + { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.13.2-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:9161865d8246eb77d1c30233a315bdad96145af783981877664532fa212f56be" }, + { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.13.2-cp314-cp314-macosx_13_0_arm64.whl", hash = "sha256:9cb0d6820107deca1823d68b96831bd982d0a11c4e6bcf0a12e8040192c48a8f" }, + { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.13.2-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:f09adb42fb898a0b3a88b02e77bee472e93f758258945386c77864016b4e4efc" }, + { url = "https://pypi.devinfra.sentry.io/wheels/confluent_kafka-2.13.2-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:fa3be1fe231e06b2c7501fa3641b30ea90ea17be79ca89806eef22ff34ed106c" }, ] [[package]] @@ -2358,7 +2360,7 @@ requires-dist = [ { name = "requests-oauthlib", specifier = ">=1.2.0" }, { name = "rfc3339-validator", specifier = ">=0.1.2" }, { name = "rfc3986-validator", specifier = ">=0.1.1" }, - { name = "sentry-arroyo", specifier = ">=2.38.1" }, + { name = "sentry-arroyo", specifier = ">=2.38.5" }, { name = "sentry-conventions", specifier = ">=0.3.0" }, { name = "sentry-forked-email-reply-parser", specifier = ">=0.5.12.post1" }, { name = "sentry-kafka-schemas", specifier = ">=2.1.27" }, @@ -2449,13 +2451,13 @@ dev = [ [[package]] name = "sentry-arroyo" -version = "2.38.1" +version = "2.38.5" source = { registry = "https://pypi.devinfra.sentry.io/simple" } dependencies = [ { name = "confluent-kafka", marker = "sys_platform == 'darwin' or sys_platform == 'linux'" }, ] wheels = [ - { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.38.1-py3-none-any.whl", hash = "sha256:e86e02127bcfc884e94d5c36c4e6c97e3e39df033effe96a4eca4346852db1b4" }, + { url = "https://pypi.devinfra.sentry.io/wheels/sentry_arroyo-2.38.5-py3-none-any.whl", hash = "sha256:add69f320b7065d675aa9f8caae65e03d35d1241534871caeff70de54ed4f33e" }, ] [[package]]