Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[high cardinality queries] track OOM on trace item table and time series endpoints #6814

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion snuba/web/rpc/v1/endpoint_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import uuid
from typing import Type

import sentry_sdk
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
TimeSeriesRequest,
TimeSeriesResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

from snuba import environment
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
Expand All @@ -32,6 +35,8 @@
# MAX 5 minute granularity over 7 days
_MAX_BUCKETS_IN_REQUEST = 2016

metrics = MetricsWrapper(environment.metrics, "endpoint_time_series")


def _enforce_no_duplicate_labels(request: TimeSeriesRequest) -> None:
labels = set()
Expand Down Expand Up @@ -105,4 +110,10 @@ def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
"This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)"
)
resolver = self.get_resolver(in_msg.meta.trace_item_type)
return resolver.resolve(in_msg)
try:
return resolver.resolve(in_msg)
except Exception as e:
if "DB::Exception: Memory limit (for query) exceeded" in str(e):
metrics.increment("endpoint_trace_item_table_OOM")
xurui-c marked this conversation as resolved.
Show resolved Hide resolved
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(str(e))
13 changes: 12 additions & 1 deletion snuba/web/rpc/v1/endpoint_trace_item_table.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import uuid
from typing import Type

import sentry_sdk
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemTableRequest,
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

from snuba import environment
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable

_GROUP_BY_DISALLOWED_COLUMNS = ["timestamp"]

metrics = MetricsWrapper(environment.metrics, "endpoint_trace_item_table")


def _apply_labels_to_columns(in_msg: TraceItemTableRequest) -> TraceItemTableRequest:
def _apply_label_to_column(column: Column) -> None:
Expand Down Expand Up @@ -104,4 +109,10 @@ def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
"This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)"
)
resolver = self.get_resolver(in_msg.meta.trace_item_type)
return resolver.resolve(in_msg)
try:
return resolver.resolve(in_msg)
except Exception as e:
if "DB::Exception: Memory limit (for query) exceeded" in str(e):
xurui-c marked this conversation as resolved.
Show resolved Hide resolved
metrics.increment("endpoint_trace_item_table_OOM")
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(str(e))
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
request=snuba_request,
timer=self._timer,
)

response_meta = extract_response_meta(
in_msg.meta.request_id,
in_msg.meta.debug,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
request=snuba_request,
timer=self._timer,
)

column_values = _convert_results(in_msg, res.result.get("data", []))
response_meta = extract_response_meta(
in_msg.meta.request_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any, Callable, MutableMapping
from unittest.mock import patch

import pytest
from clickhouse_driver.errors import ServerException
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
DataPoint,
Expand All @@ -29,6 +31,7 @@
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1 import endpoint_time_series
from snuba.web.rpc.v1.endpoint_time_series import (
EndpointTimeSeries,
_validate_time_buckets,
Expand Down Expand Up @@ -748,6 +751,56 @@ def test_with_non_existent_attribute(self) -> None:
)
]

def test_OOM(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
tstart = Timestamp(seconds=ts.seconds - 3600)
message = TimeSeriesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=tstart,
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
aggregations=[
AttributeAggregation(
aggregate=Function.FUNCTION_AVG,
key=AttributeKey(
type=AttributeKey.TYPE_FLOAT, name="sentry.duration"
),
label="p50",
),
AttributeAggregation(
aggregate=Function.FUNCTION_P95,
key=AttributeKey(
type=AttributeKey.TYPE_FLOAT, name="sentry.duration"
),
label="p90",
),
],
granularity_secs=60,
)

with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch.object(
endpoint_time_series.metrics, "increment"
) as metrics_mock, patch(
"snuba.web.rpc.v1.endpoint_trace_item_table.sentry_sdk.capture_exception"
) as sentry_sdk_mock:
with pytest.raises(BadSnubaRPCRequestException) as e:
EndpointTimeSeries().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)
metrics_mock.assert_called_once_with("endpoint_trace_item_table_OOM")
sentry_sdk_mock.assert_called_once()


class TestUtils:
def test_no_duplicate_labels(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping
from unittest.mock import patch

import pytest
from clickhouse_driver.errors import ServerException
from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Expand Down Expand Up @@ -41,6 +43,7 @@
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1 import endpoint_trace_item_table
from snuba.web.rpc.v1.endpoint_trace_item_table import (
EndpointTraceItemTable,
_apply_labels_to_columns,
Expand Down Expand Up @@ -185,6 +188,53 @@ def test_basic(self) -> None:
error_proto.ParseFromString(response.data)
assert response.status_code == 200, error_proto

def test_OOM(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
message = TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=ts,
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color")
)
),
columns=[
Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="location"))
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="location")
)
)
],
limit=10,
)
with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch.object(
endpoint_trace_item_table.metrics, "increment"
) as metrics_mock, patch(
"snuba.web.rpc.v1.endpoint_trace_item_table.sentry_sdk.capture_exception"
) as sentry_sdk_mock:
with pytest.raises(BadSnubaRPCRequestException) as e:
EndpointTraceItemTable().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)
metrics_mock.assert_called_once_with("endpoint_trace_item_table_OOM")
sentry_sdk_mock.assert_called_once()

def test_errors_without_type(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
Expand Down
Loading