Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ dependencies = [
"rfc3339-validator>=0.1.2",
"rfc3986-validator>=0.1.1",
# [end] jsonschema format validators
"sentry-arroyo>=2.38.1",
"sentry-arroyo>=2.38.5",
"sentry-conventions>=0.3.0",
"sentry-forked-email-reply-parser>=0.5.12.post1",
"sentry-kafka-schemas>=2.1.27",
Expand Down
5 changes: 4 additions & 1 deletion src/sentry/monitors/tasks/clock_pulse.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
from arroyo import Partition
from arroyo import Topic as ArroyoTopic
from arroyo.backends.kafka import KafkaPayload
from confluent_kafka.admin import AdminClient, PartitionMetadata
from confluent_kafka.admin import ( # type: ignore[attr-defined]
AdminClient,
PartitionMetadata,
)
from django.conf import settings
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.ingest_monitors_v1 import ClockPulse, IngestMonitorMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def submit(self, message: Message[KafkaPayload | FilteredPayload]) -> None:
on_delivery=partial(
self.callback, committable=message.committable, timestamp=message.timestamp
),
headers=message.payload.headers,
headers=message.payload.headers, # type: ignore[arg-type]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message.payload.headers is Arroyo's Headers type, while the arg expects confluent_kafka's HeadersType

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think we should fix this later? It feels kinda sketchy. In theory, passing a MutableSequence[Tuple[str, bytes]] where a List[Tuple[str, Union[str, bytes, None]]] is expected should be okay at runtime, but not in all cases.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to unify on just using the confluent HeadersType in the future

)

def callback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def poll(self) -> None:
def __delivery_callback(
self,
future: Future[str],
error: KafkaError,
error: KafkaError | None,
message: ConfluentMessage,
) -> None:
if error is not None:
Expand All @@ -142,14 +142,12 @@ def submit(self, message: Message[RoutingPayload]) -> None:

future: Future[str] = Future()
future.set_running_or_notify_cancel()
(
producer.produce(
topic=topic.name,
value=output_message.payload.value,
key=output_message.payload.key,
headers=output_message.payload.headers,
on_delivery=partial(self.__delivery_callback, future),
),
producer.produce(
topic=topic.name,
value=output_message.payload.value,
key=output_message.payload.key,
headers=output_message.payload.headers, # type: ignore[arg-type]
on_delivery=partial(self.__delivery_callback, future),
)
self.__queue.append((output_message.committable, future))

Expand Down
5 changes: 3 additions & 2 deletions src/sentry/testutils/pytest/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from collections.abc import MutableMapping

import pytest
from confluent_kafka import Consumer, Producer
from arroyo.processing import StreamProcessor
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient

from sentry.testutils.pytest import xdist
Expand Down Expand Up @@ -73,7 +74,7 @@ def scope_consumers():
be created once per test session).

"""
all_consumers: MutableMapping[str, Consumer | None] = {
all_consumers: MutableMapping[str, StreamProcessor | None] = {
Comment on lines -76 to +77
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typing here was always wrong, all_consumers values are StreamProcessors not Consumers

xdist.get_kafka_topic("ingest-events"): None,
xdist.get_kafka_topic("outcomes"): None,
}
Expand Down
10 changes: 7 additions & 3 deletions src/sentry/utils/batching_kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def wait_for_topics(admin_client: AdminClient, topics: list[str], timeout: int =
"""
for topic in topics:
start = time.time()
last_error = None
last_error: str | KafkaError | None = None

while True:
if time.time() > start + timeout:
Expand All @@ -25,10 +25,14 @@ def wait_for_topics(admin_client: AdminClient, topics: list[str], timeout: int =

result = admin_client.list_topics(topic=topic, timeout=timeout)
topic_metadata = result.topics.get(topic)
if topic_metadata and topic_metadata.partitions and not topic_metadata.error:
if topic_metadata is None:
last_error = "Topic metadata not found"
time.sleep(0.1)
continue
if topic_metadata.partitions and not topic_metadata.error:
logger.debug("Topic '%s' is ready", topic)
break
elif topic_metadata.error in {
elif topic_metadata.error is not None and topic_metadata.error.code() in {
KafkaError.UNKNOWN_TOPIC_OR_PART,
KafkaError.LEADER_NOT_AVAILABLE,
}:
Expand Down
2 changes: 1 addition & 1 deletion tests/sentry/monitors/tasks/test_clock_pulse.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from arroyo import Partition, Topic
from arroyo.backends.kafka import KafkaPayload
from confluent_kafka.admin import PartitionMetadata
from confluent_kafka.admin import PartitionMetadata # type: ignore[attr-defined]
from django.test import override_settings

from sentry.monitors.tasks.clock_pulse import MONITOR_CODEC, clock_pulse
Expand Down
18 changes: 10 additions & 8 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading