Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support custom endpoint for BigQuery Storage API client #1377

Draft
wants to merge 12 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions data_validation/cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@
"api_endpoint",
'(Optional) GCP BigQuery API endpoint (e.g. "https://mybq.p.googleapis.com")',
],
[
"storage_api_endpoint",
'(Optional) GCP BigQuery Storage API endpoint (e.g. "https://mybqstorage.p.googleapis.com")',
],
],
"Teradata": [
["host", "Desired Teradata host"],
Expand Down Expand Up @@ -1251,6 +1255,9 @@ def get_result_handler(rc_value: str, sa_file=None) -> dict:
consts.PROJECT_ID: conn_from_file["project_id"],
consts.TABLE_ID: config[1],
consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None),
consts.STORAGE_API_ENDPOINT: conn_from_file.get(
"storage_api_endpoint", None
),
}
else:
# We received project_name.bq_results_table
Expand Down
35 changes: 28 additions & 7 deletions data_validation/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

from data_validation import client_info, consts, exceptions
from data_validation.secret_manager import SecretManagerBuilder

# TODO Rename this directory!
from third_party.ibis.ibis_biquery.api import bigquery_connect
from third_party.ibis.ibis_cloud_spanner.api import spanner_connect
from third_party.ibis.ibis_impala.api import impala_connect
from third_party.ibis.ibis_mssql.api import mssql_connect
Expand Down Expand Up @@ -111,24 +114,42 @@ def get_google_bigquery_client(
)


def _get_google_bqstorage_client(credentials=None, api_endpoint: str = None):
options = None
if api_endpoint:
options = client_options.ClientOptions(api_endpoint=api_endpoint)
from google.cloud import bigquery_storage_v1 as bigquery_storage

return bigquery_storage.BigQueryReadClient(
credentials=credentials,
client_options=options,
)


def get_bigquery_client(
project_id: str, dataset_id: str = "", credentials=None, api_endpoint: str = None
project_id: str,
dataset_id: str = "",
credentials=None,
api_endpoint: str = None,
storage_api_endpoint: str = None,
):
google_client = get_google_bigquery_client(
project_id, credentials=credentials, api_endpoint=api_endpoint
)
bqstorage_client = None
if storage_api_endpoint:
bqstorage_client = _get_google_bqstorage_client(
credentials=credentials, api_endpoint=storage_api_endpoint
)

ibis_client = ibis.bigquery.connect(
return bigquery_connect(
project_id=project_id,
dataset_id=dataset_id,
credentials=credentials,
bigquery_client=google_client,
bqstorage_client=bqstorage_client,
)

# Override the BigQuery client object to ensure the correct user agent is
# included and any api_endpoint is used.
ibis_client.client = google_client
return ibis_client


def get_pandas_client(table_name, file_path, file_type):
"""Return pandas client and env with file loaded into DataFrame
Expand Down
1 change: 1 addition & 0 deletions data_validation/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
TABLE_ID = "table_id"
GOOGLE_SERVICE_ACCOUNT_KEY_PATH = "google_service_account_key_path"
API_ENDPOINT = "api_endpoint"
STORAGE_API_ENDPOINT = "storage_api_endpoint"

# BigQuery Output Table Fields
VALIDATION_TYPE = "validation_type"
Expand Down
2 changes: 2 additions & 0 deletions docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ data-validation connections add
[--google-service-account-key-path PATH_TO_SA_KEY] Path to SA key
[--api-endpoint ENDPOINT_URI] BigQuery API endpoint (e.g.
"https://mybq.p.googleapis.com)
[--storage-api-endpoint STORAGE_API_ENDPOINT] BigQuery Storage API endpoint (e.g.
"https://mybqstorage.p.googleapis.com)
```

### User/Service account needs following BigQuery permissions to run DVT
Expand Down
5 changes: 3 additions & 2 deletions tests/unit/test__main.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@
consts.PROJECT_ID: "dummy-gcp-project",
consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH: None,
"connection_name": "dummy-bq-connection",
"api_endpoint": None,
consts.API_ENDPOINT: None,
consts.STORAGE_API_ENDPOINT: None,
}
CONNECTION_DESCRIBE_ARGS = {
"verbose": False,
Expand Down Expand Up @@ -180,7 +181,7 @@
consts.PROJECT_ID: "dummy-gcp-project",
consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH: None,
"connection_name": "dummy-bq-connection",
"api_endpoint": None,
consts.API_ENDPOINT: None,
} # same as CONNECTION_ADD_ARGS but with the command item replaced
FIND_TABLES_ARGS = {
"verbose": False,
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_cli_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
"example-project",
"--api-endpoint",
"https://mybq.p.googleapis.com",
"--storage-api-endpoint",
"https://mybqs.p.googleapis.com",
]

CLI_ADD_ORACLE_STD_CONNECTION_ARGS = [
Expand Down Expand Up @@ -553,6 +555,7 @@ def test_get_result_handler_by_conn_file(fs):
consts.PROJECT_ID: args.project_id,
consts.TABLE_ID: "dataset.table",
consts.API_ENDPOINT: args.api_endpoint,
consts.STORAGE_API_ENDPOINT: args.storage_api_endpoint,
}

# Plus check standard format still works.
Expand Down
137 changes: 137 additions & 0 deletions third_party/ibis/ibis_biquery/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import google.auth.credentials
import google.cloud.bigquery as bq
import pydata_google_auth
from pydata_google_auth import cache

from ibis.backends.bigquery import (
Backend as BigQueryBackend,
_create_client_info,
parse_project_and_dataset,
CLIENT_ID,
CLIENT_SECRET,
EXTERNAL_DATA_SCOPES,
SCOPES,
)


class Backend(BigQueryBackend):
def __init__(self):
super().__init__()
self.storage_client = None

def do_connect(
self,
project_id: str = None,
dataset_id: str = "",
credentials: google.auth.credentials.Credentials = None,
application_name: str = None,
auth_local_webserver: bool = True,
auth_external_data: bool = False,
auth_cache: str = "default",
partition_column: str = "PARTITIONTIME",
# Custom DVT arguments:
bigquery_client=None,
bqstorage_client=None,
):
"""Copy of Ibis v5 BigQuery do_connect() customized for DVT, see oringal method for docs."""
default_project_id = ""

if credentials is None:
scopes = SCOPES
if auth_external_data:
scopes = EXTERNAL_DATA_SCOPES

if auth_cache == "default":
credentials_cache = cache.ReadWriteCredentialsCache(
filename="ibis.json"
)
elif auth_cache == "reauth":
credentials_cache = cache.WriteOnlyCredentialsCache(
filename="ibis.json"
)
elif auth_cache == "none":
credentials_cache = cache.NOOP
else:
raise ValueError(
f"Got unexpected value for auth_cache = '{auth_cache}'. "
"Expected one of 'default', 'reauth', or 'none'."
)

credentials, default_project_id = pydata_google_auth.default(
scopes,
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
credentials_cache=credentials_cache,
use_local_webserver=auth_local_webserver,
)

project_id = project_id or default_project_id

(
self.data_project,
self.billing_project,
self.dataset,
) = parse_project_and_dataset(project_id, dataset_id)

if bigquery_client is None:
self.client = bq.Client(
project=self.billing_project,
credentials=credentials,
client_info=_create_client_info(application_name),
)
else:
self.client = bigquery_client
self.partition_column = partition_column
self.storage_client = bqstorage_client

def _cursor_to_arrow(
self,
cursor,
*,
method=None,
chunk_size: int = None,
):
"""Copy of Ibis v5 BigQuery _cursor_to_arrow() except can use custom DVT storage client"""
if method is None:

def method(result, storage_client=self.storage_client):
return result.to_arrow(
progress_bar_type=None,
# Include DVT specific storage client.
bqstorage_client=storage_client,
create_bqstorage_client=bool(not self.storage_client),
)

query = cursor.query
query_result = query.result(page_size=chunk_size)
# workaround potentially not having the ability to create read sessions
# in the dataset project
orig_project = query_result._project
query_result._project = self.billing_project
try:
arrow_obj = method(query_result)
finally:
query_result._project = orig_project
return arrow_obj

def list_primary_key_columns(self, database: str, table: str) -> list:
"""Return a list of primary key column names."""
# TODO: Related to issue-1253, it's not clear if this is possible, we should revisit if it becomes a requirement.
return None

def dvt_list_tables(self, like=None, database=None):
return self.list_tables(like=like, database=database)
42 changes: 31 additions & 11 deletions third_party/ibis/ibis_biquery/api.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,35 @@
from ibis.backends.bigquery import Backend as BigQueryBackend
# Copyright 2024 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def _list_primary_key_columns(self, database: str, table: str) -> list:
"""Return a list of primary key column names."""
# TODO: Related to issue-1253, it's not clear if this is possible, we should revisit if it becomes a requirement.
return None
from third_party.ibis.ibis_biquery import Backend as BigQueryBackend


def _dvt_list_tables(self, like=None, database=None):
return self.list_tables(like=like, database=database)


BigQueryBackend.list_primary_key_columns = _list_primary_key_columns
BigQueryBackend.dvt_list_tables = _dvt_list_tables
def bigquery_connect(
project_id: str = None,
dataset_id: str = "",
credentials=None,
bigquery_client=None,
bqstorage_client=None,
):
"""Create a BigQuery Backend for use with Ibis."""
backend = BigQueryBackend()
backend.do_connect(
project_id=project_id,
dataset_id=dataset_id,
credentials=credentials,
bigquery_client=bigquery_client,
bqstorage_client=bqstorage_client,
)
return backend