Skip to content

Commit

Permalink
fix(inc-1013): Update arroyo verion (#6824)
Browse files Browse the repository at this point in the history
  • Loading branch information
volokluev authored Jan 27, 2025
1 parent 7424e9e commit 246b40a
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ black==22.6.0
blinker==1.5
click==8.1.7
clickhouse-driver==0.2.6
confluent-kafka==2.3.0
confluent-kafka==2.7.0
datadog==0.21.0
devservices==1.0.10
flake8==7.0.0
Expand All @@ -27,7 +27,7 @@ pytest-watch==4.2.0
python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.5.4
sentry-arroyo==2.19.4
sentry-arroyo==2.19.12
sentry-kafka-schemas==0.1.129
sentry-protos==0.1.53
sentry-redis-tools==0.3.0
Expand Down
6 changes: 3 additions & 3 deletions tests/subscriptions/test_executor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.produce import Produce
from arroyo.types import BrokerValue, Message, Partition, Topic
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from confluent_kafka.admin import AdminClient

from snuba import state
Expand Down Expand Up @@ -316,7 +316,7 @@ def test_skip_execution_for_entity() -> None:
def test_execute_and_produce_result() -> None:
scheduled_topic = Topic("scheduled-subscriptions-events")
result_topic = Topic("events-subscriptions-results")
clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(scheduled_topic, partitions=1)
Expand Down Expand Up @@ -356,7 +356,7 @@ def test_execute_and_produce_result() -> None:
def test_skip_stale_message() -> None:
scheduled_topic = Topic("scheduled-subscriptions-events")
result_topic = Topic("events-subscriptions-results")
clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(scheduled_topic, partitions=1)
Expand Down
8 changes: 4 additions & 4 deletions tests/subscriptions/test_scheduler_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from arroyo.commit import Commit
from arroyo.errors import ConsumerError
from arroyo.types import BrokerValue, Partition, Topic
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock
from confluent_kafka.admin import AdminClient
from py._path.local import LocalPath
from sentry_protos.snuba.v1.endpoint_create_subscription_pb2 import (
Expand Down Expand Up @@ -297,7 +297,7 @@ def test_tick_time_shift() -> None:
],
)
def test_tick_consumer(time_shift: Optional[timedelta]) -> None:
clock = TestingClock()
clock = MockedClock()
broker: Broker[KafkaPayload] = Broker(MemoryMessageStorage(), clock)

epoch = datetime.fromtimestamp(clock.time())
Expand Down Expand Up @@ -432,7 +432,7 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:


def test_tick_consumer_non_monotonic() -> None:
clock = TestingClock()
clock = MockedClock()
broker: Broker[KafkaPayload] = Broker(MemoryMessageStorage(), clock)

epoch = datetime.fromtimestamp(clock.time())
Expand Down Expand Up @@ -554,7 +554,7 @@ def _assignment_callback(offsets: Mapping[Partition, int]) -> None:


def test_invalid_commit_log_message(caplog: Any) -> None:
clock = TestingClock()
clock = MockedClock()
broker: Broker[KafkaPayload] = Broker(MemoryMessageStorage(), clock)

topic = Topic("messages")
Expand Down
6 changes: 3 additions & 3 deletions tests/subscriptions/test_scheduler_processing_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from arroyo.backends.local.backend import LocalBroker as Broker
from arroyo.backends.local.storages.memory import MemoryMessageStorage
from arroyo.types import BrokerValue
from arroyo.utils.clock import TestingClock
from arroyo.utils.clock import MockedClock

from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
Expand Down Expand Up @@ -544,7 +544,7 @@ def test_produce_scheduled_subscription_message() -> None:
topic = Topic("scheduled-subscriptions-events")
partition = Partition(topic, partition_index)

clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(topic, partitions=1)
Expand Down Expand Up @@ -663,7 +663,7 @@ def test_produce_stale_message() -> None:
topic = Topic("scheduled-subscriptions-events")
partition = Partition(topic, partition_index)

clock = TestingClock()
clock = MockedClock()
broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
broker: Broker[KafkaPayload] = Broker(broker_storage, clock)
broker.create_topic(topic, partitions=1)
Expand Down

0 comments on commit 246b40a

Please sign in to comment.