Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(inc-1013): Update arroyo verion #6824

Merged
merged 3 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ black==22.6.0
blinker==1.5
click==8.1.7
clickhouse-driver==0.2.6
confluent-kafka==2.3.0
confluent-kafka==2.7.0
datadog==0.21.0
devservices==1.0.10
flake8==7.0.0
Expand All @@ -27,7 +27,7 @@ pytest-watch==4.2.0
python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.5.4
sentry-arroyo==2.19.4
sentry-arroyo==2.19.12
sentry-kafka-schemas==0.1.129
sentry-protos==0.1.51
sentry-redis-tools==0.3.0
Expand Down
6 changes: 3 additions & 3 deletions tests/subscriptions/test_executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.produce import Produce
from arroyo.types import BrokerValue, Message, Partition, Topic
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from confluent_kafka.admin import AdminClient

from snuba import state
Expand Down Expand Up @@ -316,7 +316,7 @@ def test_skip_execution_for_entity() -> None:
def test_execute_and_produce_result() -> None:
scheduled_topic = Topic("scheduled-subscriptions-events")
result_topic = Topic("events-subscriptions-results")
clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(scheduled_topic, partitions=1)
Expand Down Expand Up @@ -356,7 +356,7 @@ def test_execute_and_produce_result() -> None:
def test_skip_stale_message() -> None:
scheduled_topic = Topic("scheduled-subscriptions-events")
result_topic = Topic("events-subscriptions-results")
clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(scheduled_topic, partitions=1)
Expand Down
8 changes: 4 additions & 4 deletions tests/subscriptions/test_scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from arroyo.commit import Commit
from arroyo.errors import ConsumerError
from arroyo.types import BrokerValue, Partition, Topic
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from confluent_kafka.admin import AdminClient
from py._path.local import LocalPath
from sentry_protos.snuba.v1.endpoint_create_subscription_pb2 import (
Expand Down Expand Up @@ -297,7 +297,7 @@ def test_tick_time_shift() -> None:
],
)
def test_tick_consumer(time_shift: Optional[timedelta]) -> None:
clock = TestingClock()
clock = MockedClock()
broker: Broker[KafkaPayload] = Broker(MemoryMessageStorage(), clock)

epoch = datetime.fromtimestamp(clock.time())
Expand Down Expand Up @@ -432,7 +432,7 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:


def test_tick_consumer_non_monotonic() -> None:
clock = TestingClock()
clock = MockedClock()
broker: Broker[KafkaPayload] = Broker(MemoryMessageStorage(), clock)

epoch = datetime.fromtimestamp(clock.time())
Expand Down Expand Up @@ -554,7 +554,7 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:


def test_invalid_commit_log_message(caplog: Any) -> None:
clock = TestingClock()
clock = MockedClock()
broker: Broker[KafkaPayload] = Broker(MemoryMessageStorage(), clock)

topic = Topic("messages")
Expand Down
6 changes: 3 additions & 3 deletions tests/subscriptions/test_scheduler_processing_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from arroyo.backends.local.backend import LocalBroker as Broker
from arroyo.backends.local.storages.memory import MemoryMessageStorage
from arroyo.types import BrokerValue
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock

from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
Expand Down Expand Up @@ -544,7 +544,7 @@ def test_produce_scheduled_subscription_message() -> None:
topic = Topic("scheduled-subscriptions-events")
partition = Partition(topic, partition_index)

clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(topic, partitions=1)
Expand Down Expand Up @@ -663,7 +663,7 @@ def test_produce_stale_message() -> None:
topic = Topic("scheduled-subscriptions-events")
partition = Partition(topic, partition_index)

clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(topic, partitions=1)
Expand Down
Loading