Skip to content

Commit

Permalink
code review
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachel Chen authored and Rachel Chen committed Feb 3, 2025
1 parent d18c096 commit 291b242
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 55 deletions.
9 changes: 8 additions & 1 deletion snuba/web/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,14 @@ def run_rpc_handler(

try:
return cast(ProtobufMessage, endpoint.execute(deserialized_protobuf))
except (RPCRequestException, QueryException) as e:
except RPCRequestException as e:
return convert_rpc_exception_to_proto(e)
except QueryException as e:
if (
"code" in e.extra_data and e.extra["code"] == 241
) or "DB::Exception: Memory limit (for query) exceeded" in str(e):
endpoint.metrics.increment("OOM_query")
sentry_sdk.capture_exception(e)
return convert_rpc_exception_to_proto(e)
except Exception as e:
sentry_sdk.capture_exception(e)
Expand Down
16 changes: 1 addition & 15 deletions snuba/web/rpc/v1/endpoint_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,12 @@
import uuid
from typing import Type

import sentry_sdk
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
TimeSeriesRequest,
TimeSeriesResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

from snuba import environment
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.web import QueryException
from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
Expand All @@ -36,8 +32,6 @@
# MAX 5 minute granularity over 7 days
_MAX_BUCKETS_IN_REQUEST = 2016

metrics = MetricsWrapper(environment.metrics, "endpoint_time_series")


def _enforce_no_duplicate_labels(request: TimeSeriesRequest) -> None:
labels = set()
Expand Down Expand Up @@ -111,12 +105,4 @@ def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
"This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)"
)
resolver = self.get_resolver(in_msg.meta.trace_item_type)
try:
return resolver.resolve(in_msg)
except QueryException as e:
if e.extra[
"code"
] == 241 or "DB::Exception: Memory limit (for query) exceeded" in str(e):
metrics.increment("endpoint_trace_item_table_OOM")
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(str(e))
return resolver.resolve(in_msg)
16 changes: 1 addition & 15 deletions snuba/web/rpc/v1/endpoint_trace_item_table.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
import uuid
from typing import Type

import sentry_sdk
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemTableRequest,
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

from snuba import environment
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.web import QueryException
from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable

_GROUP_BY_DISALLOWED_COLUMNS = ["timestamp"]

metrics = MetricsWrapper(environment.metrics, "endpoint_trace_item_table")


def _apply_labels_to_columns(in_msg: TraceItemTableRequest) -> TraceItemTableRequest:
def _apply_label_to_column(column: Column) -> None:
Expand Down Expand Up @@ -110,12 +104,4 @@ def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
"This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)"
)
resolver = self.get_resolver(in_msg.meta.trace_item_type)
try:
return resolver.resolve(in_msg)
except QueryException as e:
if e.extra[
"code"
] == 241 or "DB::Exception: Memory limit (for query) exceeded" in str(e):
metrics.increment("endpoint_trace_item_table_OOM")
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(str(e))
return resolver.resolve(in_msg)
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any, Callable, MutableMapping
from unittest.mock import patch
from unittest.mock import MagicMock, call, patch

import pytest
from clickhouse_driver.errors import ServerException
Expand All @@ -13,6 +13,7 @@
TimeSeriesRequest,
)
from sentry_protos.snuba.v1.error_pb2 import Error
from sentry_protos.snuba.v1.error_pb2 import Error as ErrorProto
from sentry_protos.snuba.v1.request_common_pb2 import RequestMeta, TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeAggregation,
Expand All @@ -30,8 +31,8 @@

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc import RPCEndpoint, run_rpc_handler
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1 import endpoint_time_series
from snuba.web.rpc.v1.endpoint_time_series import (
EndpointTimeSeries,
_validate_time_buckets,
Expand Down Expand Up @@ -751,7 +752,7 @@ def test_with_non_existent_attribute(self) -> None:
)
]

def test_OOM(self) -> None:
def test_OOM(self, monkeypatch) -> None:
ts = Timestamp()
ts.GetCurrentTime()
tstart = Timestamp(seconds=ts.seconds - 3600)
Expand Down Expand Up @@ -784,22 +785,25 @@ def test_OOM(self) -> None:
granularity_secs=60,
)

metrics_mock = MagicMock()
monkeypatch.setattr(RPCEndpoint, "metrics", property(lambda x: metrics_mock))
with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch.object(
endpoint_time_series.metrics, "increment"
) as metrics_mock, patch(
"snuba.web.rpc.v1.endpoint_trace_item_table.sentry_sdk.capture_exception"
) as sentry_sdk_mock:
with pytest.raises(BadSnubaRPCRequestException) as e:
EndpointTimeSeries().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)
metrics_mock.assert_called_once_with("endpoint_trace_item_table_OOM")
), patch("snuba.web.rpc.sentry_sdk.capture_exception") as sentry_sdk_mock:
resp = run_rpc_handler(
"EndpointTimeSeries", "v1", message.SerializeToString()
)
assert isinstance(resp, ErrorProto)
assert "DB::Exception: Memory limit (for query) exceeded" in str(
resp.message
)

sentry_sdk_mock.assert_called_once()
assert metrics_mock.increment.call_args_list.count(call("OOM_query")) == 1


class TestUtils:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping
from unittest.mock import patch
from unittest.mock import MagicMock, call, patch

import pytest
from clickhouse_driver.errors import ServerException
Expand Down Expand Up @@ -42,8 +42,8 @@

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc import RPCEndpoint, run_rpc_handler
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1 import endpoint_trace_item_table
from snuba.web.rpc.v1.endpoint_trace_item_table import (
EndpointTraceItemTable,
_apply_labels_to_columns,
Expand Down Expand Up @@ -188,7 +188,7 @@ def test_basic(self) -> None:
error_proto.ParseFromString(response.data)
assert response.status_code == 200, error_proto

def test_OOM(self) -> None:
def test_OOM(self, monkeypatch) -> None:
ts = Timestamp()
ts.GetCurrentTime()
message = TraceItemTableRequest(
Expand Down Expand Up @@ -218,22 +218,25 @@ def test_OOM(self) -> None:
],
limit=10,
)
metrics_mock = MagicMock()
monkeypatch.setattr(RPCEndpoint, "metrics", property(lambda x: metrics_mock))
with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch.object(
endpoint_trace_item_table.metrics, "increment"
) as metrics_mock, patch(
"snuba.web.rpc.v1.endpoint_trace_item_table.sentry_sdk.capture_exception"
) as sentry_sdk_mock:
with pytest.raises(BadSnubaRPCRequestException) as e:
EndpointTraceItemTable().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)
metrics_mock.assert_called_once_with("endpoint_trace_item_table_OOM")
), patch("snuba.web.rpc.sentry_sdk.capture_exception") as sentry_sdk_mock:
resp = run_rpc_handler(
"EndpointTraceItemTable", "v1", message.SerializeToString()
)
assert isinstance(resp, ErrorProto)
assert "DB::Exception: Memory limit (for query) exceeded" in str(
resp.message
)

sentry_sdk_mock.assert_called_once()
assert metrics_mock.increment.call_args_list.count(call("OOM_query")) == 1

def test_errors_without_type(self) -> None:
ts = Timestamp()
Expand Down

0 comments on commit 291b242

Please sign in to comment.