Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[high cardinality queries] track OOM on trace item table and time series endpoints #6814

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion snuba/web/rpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,18 @@ def execute(self, in_msg: Tin) -> Tout:
error = None
try:
out = self._execute(in_msg)
except Exception as e:
except QueryException as e:
if (
"error_code" in e.extra["stats"]
and e.extra["stats"]["error_code"] == 241
):
self.metrics.increment("OOM_query")
sentry_sdk.capture_exception(e)
out = self.response_class()()
error = e
except Exception as e:
out = self.response_class()()
error = e # type: ignore
return self.__after_execute(in_msg, out, error)

def __before_execute(self, in_msg: Tin) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any, Callable, MutableMapping
from unittest.mock import MagicMock, call, patch

import pytest
from clickhouse_driver.errors import ServerException
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
DataPoint,
Expand All @@ -28,6 +30,8 @@

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web import QueryException
from snuba.web.rpc import RPCEndpoint
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.endpoint_time_series import (
EndpointTimeSeries,
Expand Down Expand Up @@ -748,6 +752,55 @@ def test_with_non_existent_attribute(self) -> None:
)
]

def test_OOM(self, monkeypatch: Any) -> None:
ts = Timestamp()
ts.GetCurrentTime()
tstart = Timestamp(seconds=ts.seconds - 3600)
message = TimeSeriesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=tstart,
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
aggregations=[
AttributeAggregation(
aggregate=Function.FUNCTION_AVG,
key=AttributeKey(
type=AttributeKey.TYPE_FLOAT, name="sentry.duration"
),
label="p50",
),
AttributeAggregation(
aggregate=Function.FUNCTION_P95,
key=AttributeKey(
type=AttributeKey.TYPE_FLOAT, name="sentry.duration"
),
label="p90",
),
],
granularity_secs=60,
)

metrics_mock = MagicMock()
monkeypatch.setattr(RPCEndpoint, "metrics", property(lambda x: metrics_mock))
with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch("snuba.web.rpc.sentry_sdk.capture_exception") as sentry_sdk_mock:
with pytest.raises(QueryException) as e:
EndpointTimeSeries().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)

sentry_sdk_mock.assert_called_once()
assert metrics_mock.increment.call_args_list.count(call("OOM_query")) == 1


class TestUtils:
def test_no_duplicate_labels(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping
from unittest.mock import MagicMock, call, patch

import pytest
from clickhouse_driver.errors import ServerException
from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Expand Down Expand Up @@ -40,6 +42,8 @@

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web import QueryException
from snuba.web.rpc import RPCEndpoint
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.endpoint_trace_item_table import (
EndpointTraceItemTable,
Expand Down Expand Up @@ -185,6 +189,52 @@ def test_basic(self) -> None:
error_proto.ParseFromString(response.data)
assert response.status_code == 200, error_proto

def test_OOM(self, monkeypatch: Any) -> None:
ts = Timestamp()
ts.GetCurrentTime()
message = TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=ts,
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color")
)
),
columns=[
Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="location"))
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="location")
)
)
],
limit=10,
)
metrics_mock = MagicMock()
monkeypatch.setattr(RPCEndpoint, "metrics", property(lambda x: metrics_mock))
with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch("snuba.web.rpc.sentry_sdk.capture_exception") as sentry_sdk_mock:
with pytest.raises(QueryException) as e:
EndpointTraceItemTable().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)

sentry_sdk_mock.assert_called_once()
assert metrics_mock.increment.call_args_list.count(call("OOM_query")) == 1

def test_errors_without_type(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
Expand Down
Loading