Skip to content

Commit

Permalink
finishing
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachel Chen authored and Rachel Chen committed Jan 31, 2025
1 parent 7965770 commit eb45416
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 46 deletions.
4 changes: 0 additions & 4 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ def query_execute() -> Any:
else:
result_data = query_execute()

print("result_dataaaa", result_data, with_column_types)

profile_data = ClickhouseProfile(
bytes=conn.last_query.profile_info.bytes or 0,
progress_bytes=conn.last_query.progress.bytes or 0,
Expand Down Expand Up @@ -519,8 +517,6 @@ def execute(
if "query_id" in settings:
query_id = settings.pop("query_id")

print("isrobust???", self.__client.execute)

execute_func = (
self.__client.execute_robust if robust is True else self.__client.execute
)
Expand Down
5 changes: 0 additions & 5 deletions snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ def execute_query(
robust=robust,
)


print("readerrrr", reader)

print("resultttt", result)

timer.mark("execute")
stats.update(
{
Expand Down
4 changes: 0 additions & 4 deletions snuba/web/rpc/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class BadSnubaRPCRequestException(RPCRequestException):
def __init__(self, message: str):
super().__init__(400, message)

class OOMException(RPCRequestException):
def __init__(self, message: str):
super().__init__(241, message)


def convert_rpc_exception_to_proto(
exc: Union[RPCRequestException, QueryException]
Expand Down
13 changes: 12 additions & 1 deletion snuba/web/rpc/v1/endpoint_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import uuid
from typing import Type

import sentry_sdk
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
TimeSeriesRequest,
TimeSeriesResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

from snuba import environment
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
Expand All @@ -32,6 +35,8 @@
# MAX 5 minute granularity over 7 days
_MAX_BUCKETS_IN_REQUEST = 2016

metrics = MetricsWrapper(environment.metrics, "endpoint_time_series")


def _enforce_no_duplicate_labels(request: TimeSeriesRequest) -> None:
labels = set()
Expand Down Expand Up @@ -105,4 +110,10 @@ def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
"This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)"
)
resolver = self.get_resolver(in_msg.meta.trace_item_type)
return resolver.resolve(in_msg)
try:
return resolver.resolve(in_msg)
except Exception as e:
if "DB::Exception: Memory limit (for query) exceeded" in str(e):
metrics.increment("endpoint_trace_item_table_OOM")
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(str(e))
17 changes: 16 additions & 1 deletion snuba/web/rpc/v1/endpoint_trace_item_table.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import uuid
from typing import Type

import sentry_sdk
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemTableRequest,
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType

from snuba import environment
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable

_GROUP_BY_DISALLOWED_COLUMNS = ["timestamp"]

metrics = MetricsWrapper(environment.metrics, "endpoint_trace_item_table")


def _apply_labels_to_columns(in_msg: TraceItemTableRequest) -> TraceItemTableRequest:
def _apply_label_to_column(column: Column) -> None:
Expand Down Expand Up @@ -104,4 +109,14 @@ def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
"This endpoint requires meta.trace_item_type to be set (are you requesting spans? logs?)"
)
resolver = self.get_resolver(in_msg.meta.trace_item_type)
return resolver.resolve(in_msg)
try:
return resolver.resolve(in_msg)
except Exception as e:
print(e.args)
print(str(e))
print(e)
print("didthisraise??????")
if "DB::Exception: Memory limit (for query) exceeded" in str(e):
metrics.increment("endpoint_trace_item_table_OOM")
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(str(e))
21 changes: 5 additions & 16 deletions snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
extract_response_meta,
setup_trace_query_settings,
)
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTimeSeries
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import (
ExtrapolationContext,
Expand All @@ -48,14 +47,10 @@
get_confidence_interval_column,
get_count_column,
)
from clickhouse_driver.errors import Error
import sentry_sdk


metrics = MetricsWrapper(environment.metrics, "endpoint_trace_item_table")



def _convert_result_timeseries(
request: TimeSeriesRequest, data: list[Dict[str, Any]]
) -> Iterable[TimeSeries]:
Expand Down Expand Up @@ -306,17 +301,11 @@ def trace_item_type(cls) -> TraceItemType.ValueType:

def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse:
snuba_request = _build_snuba_request(in_msg)
try:
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
request=snuba_request,
timer=self._timer,
)
except Error as e:
if e.code == 241 or "DB::Exception: Memory limit (for query) exceeded" in e.message:
metrics.increment("endpoint_trace_item_table_OOM")
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(e.message)
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
request=snuba_request,
timer=self._timer,
)

response_meta = extract_response_meta(
in_msg.meta.request_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
from collections import defaultdict
from dataclasses import replace
from typing import Any, Callable, Dict, Iterable, Sequence
import sentry_sdk

from clickhouse_driver.errors import Error
from google.protobuf.json_format import MessageToDict
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
AggregationComparisonFilter,
Expand Down Expand Up @@ -47,7 +45,7 @@
extract_response_meta,
setup_trace_query_settings,
)
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException, OOMException
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable
from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import (
ExtrapolationContext,
Expand Down Expand Up @@ -302,17 +300,11 @@ def trace_item_type(cls) -> TraceItemType.ValueType:

def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse:
snuba_request = _build_snuba_request(in_msg)
try:
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
request=snuba_request,
timer=self._timer,
)
except Error as e:
if e.code == 241 or "DB::Exception: Memory limit (for query) exceeded" in e.message:
metrics.increment("endpoint_trace_item_table_OOM")
sentry_sdk.capture_exception(e)
raise BadSnubaRPCRequestException(e.message)
res = run_query(
dataset=PluggableDataset(name="eap", all_entities=[]),
request=snuba_request,
timer=self._timer,
)

column_values = _convert_results(in_msg, res.result.get("data", []))
response_meta = extract_response_meta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any, Callable, MutableMapping
from unittest.mock import patch

import pytest
from clickhouse_driver.errors import ServerException
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_time_series_pb2 import (
DataPoint,
Expand All @@ -29,6 +31,7 @@
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1 import endpoint_time_series
from snuba.web.rpc.v1.endpoint_time_series import (
EndpointTimeSeries,
_validate_time_buckets,
Expand Down Expand Up @@ -748,6 +751,56 @@ def test_with_non_existent_attribute(self) -> None:
)
]

def test_OOM(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
tstart = Timestamp(seconds=ts.seconds - 3600)
message = TimeSeriesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=tstart,
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
aggregations=[
AttributeAggregation(
aggregate=Function.FUNCTION_AVG,
key=AttributeKey(
type=AttributeKey.TYPE_FLOAT, name="sentry.duration"
),
label="p50",
),
AttributeAggregation(
aggregate=Function.FUNCTION_P95,
key=AttributeKey(
type=AttributeKey.TYPE_FLOAT, name="sentry.duration"
),
label="p90",
),
],
granularity_secs=60,
)

with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch.object(
endpoint_time_series.metrics, "increment"
) as metrics_mock, patch(
"snuba.web.rpc.v1.endpoint_trace_item_table.sentry_sdk.capture_exception"
) as sentry_sdk_mock:
with pytest.raises(BadSnubaRPCRequestException) as e:
EndpointTimeSeries().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)
metrics_mock.assert_called_once_with("endpoint_trace_item_table_OOM")
sentry_sdk_mock.assert_called_once()


class TestUtils:
def test_no_duplicate_labels(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping
from unittest.mock import patch

import pytest
from clickhouse_driver.errors import ServerException
from google.protobuf.json_format import MessageToDict, ParseDict
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Expand Down Expand Up @@ -41,6 +43,7 @@
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException
from snuba.web.rpc.v1 import endpoint_trace_item_table
from snuba.web.rpc.v1.endpoint_trace_item_table import (
EndpointTraceItemTable,
_apply_labels_to_columns,
Expand Down Expand Up @@ -185,6 +188,53 @@ def test_basic(self) -> None:
error_proto.ParseFromString(response.data)
assert response.status_code == 200, error_proto

def test_OOM(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
message = TraceItemTableRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=ts,
end_timestamp=ts,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN,
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color")
)
),
columns=[
Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="location"))
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="location")
)
)
],
limit=10,
)
with patch(
"clickhouse_driver.client.Client.execute",
side_effect=ServerException(
"DB::Exception: Received from snuba-events-analytics-platform-1-1:1111. DB::Exception: Memory limit (for query) exceeded: would use 1.11GiB (attempt to allocate chunk of 111111 bytes), maximum: 1.11 GiB. Blahblahblahblahblahblahblah",
code=241,
),
), patch.object(
endpoint_trace_item_table.metrics, "increment"
) as metrics_mock, patch(
"snuba.web.rpc.v1.endpoint_trace_item_table.sentry_sdk.capture_exception"
) as sentry_sdk_mock:
with pytest.raises(BadSnubaRPCRequestException) as e:
EndpointTraceItemTable().execute(message)
assert "DB::Exception: Memory limit (for query) exceeded" in str(e.value)
metrics_mock.assert_called_once_with("endpoint_trace_item_table_OOM")
sentry_sdk_mock.assert_called_once()

def test_errors_without_type(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
Expand Down Expand Up @@ -269,7 +319,6 @@ def test_with_data(self, setup_teardown: Any) -> None:
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"),
)
assert response == expected_response
assert False

def test_booleans_and_number_compares_backward_compat(
self, setup_teardown: Any
Expand Down

0 comments on commit eb45416

Please sign in to comment.