Skip to content

feature: add pagination to metadata routes #457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 26 additions & 22 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ async def _init(self, create_triggers: bool):
)

async def get_records(self, filter_dict={}, fetch_single=False,
ordering: List[str] = None, limit: int = 0, expanded=False,
ordering: List[str] = None, limit: int = 0, offset: int = 0, expanded=False,
cur: aiopg.Cursor = None) -> DBResponse:
conditions = []
values = []
Expand All @@ -211,7 +211,7 @@ async def get_records(self, filter_dict={}, fetch_single=False,

response, _ = await self.find_records(
conditions=conditions, values=values, fetch_single=fetch_single,
order=ordering, limit=limit, expanded=expanded, cur=cur
order=ordering, limit=limit, offset=offset, expanded=expanded, cur=cur
)
return response

Expand Down Expand Up @@ -572,8 +572,8 @@ async def get_flow(self, flow_id: str):
filter_dict = {"flow_id": flow_id}
return await self.get_records(filter_dict=filter_dict, fetch_single=True)

async def get_all_flows(self):
return await self.get_records()
async def get_all_flows(self, limit:int = 0, offset: int = 0):
return await self.get_records(limit=limit, offset=offset)


class AsyncRunTablePostgres(AsyncPostgresTable):
Expand Down Expand Up @@ -606,9 +606,9 @@ async def get_run(self, flow_id: str, run_id: str, expanded: bool = False, cur:
return await self.get_records(filter_dict=filter_dict,
fetch_single=True, expanded=expanded, cur=cur)

async def get_all_runs(self, flow_id: str):
async def get_all_runs(self, flow_id: str, limit:int=0, offset:int=0):
filter_dict = {"flow_id": flow_id}
return await self.get_records(filter_dict=filter_dict)
return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset)

async def update_heartbeat(self, flow_id: str, run_id: str):
run_key, run_value = translate_run_key(run_id)
Expand Down Expand Up @@ -661,11 +661,11 @@ async def add_step(self, step_object: StepRow):
}
return await self.create_record(dict)

async def get_steps(self, flow_id: str, run_id: str):
async def get_steps(self, flow_id: str, run_id: str, limit:int=0, offset:int=0):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {"flow_id": flow_id,
run_id_key: run_id_value}
return await self.get_records(filter_dict=filter_dict)
return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset)

async def get_step(self, flow_id: str, run_id: str, step_name: str):
run_id_key, run_id_value = translate_run_key(run_id)
Expand Down Expand Up @@ -705,14 +705,14 @@ async def add_task(self, task: TaskRow, fill_heartbeat=False):
}
return await self.create_record(dict)

async def get_tasks(self, flow_id: str, run_id: str, step_name: str):
async def get_tasks(self, flow_id: str, run_id: str, step_name: str, limit:int=0, offset:int=0):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
}
return await self.get_records(filter_dict=filter_dict)
return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset)

async def get_task(self, flow_id: str, run_id: str, step_name: str,
task_id: str, expanded: bool = False):
Expand Down Expand Up @@ -795,14 +795,14 @@ async def add_metadata(
}
return await self.create_record(dict)

async def get_metadata_in_runs(self, flow_id: str, run_id: str):
async def get_metadata_in_runs(self, flow_id: str, run_id: str, limit:int=0, offset:int=0):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {"flow_id": flow_id,
run_id_key: run_id_value}
return await self.get_records(filter_dict=filter_dict)
return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset)

async def get_metadata(
self, flow_id: str, run_id: int, step_name: str, task_id: str
self, flow_id: str, run_id: int, step_name: str, task_id: str, limit: int = 0, offset: int = 0
):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
Expand All @@ -812,9 +812,9 @@ async def get_metadata(
"step_name": step_name,
task_id_key: task_id_value,
}
return await self.get_records(filter_dict=filter_dict)
return await self.get_records(filter_dict=filter_dict, limit=limit, offset=offset)

async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name: str, field_name: str, pattern: str):
async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name: str, field_name: str, pattern: str, limit:int = 0, offset: int = 0):
"""
Returns a list of task pathspecs that match the given field_name and regexp pattern for the value
"""
Expand Down Expand Up @@ -844,14 +844,18 @@ async def get_filtered_task_pathspecs(self, flow_id: str, run_id: str, step_name
) T
{where}
{order_by}
{limit}
{offset}
"""

select_sql = sql_template.format(
keys=",".join(self.select_columns),
table_name=self.table_name,
where="WHERE {}".format(" AND ".join(conditions)),
order_by="ORDER BY task_id",
select_columns=",".join(["flow_id, run_number, run_id, step_name, task_name, task_id"])
select_columns=",".join(["flow_id, run_number, run_id, step_name, task_name, task_id"]),
limit="LIMIT {}".format(limit) if limit else "",
offset= "OFFSET {}".format(offset) if offset else ""
).strip()

db_response, pagination = await self.execute_sql(select_sql=select_sql, values=values, serialize=False)
Expand Down Expand Up @@ -922,27 +926,27 @@ async def add_artifact(
}
return await self.create_record(dict)

async def get_artifacts_in_runs(self, flow_id: str, run_id: int):
async def get_artifacts_in_runs(self, flow_id: str, run_id: int, limit:int=0, offset:int=0):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
}
return await self.get_records(filter_dict=filter_dict,
ordering=self.ordering)
ordering=self.ordering, limit=limit,offset=offset)

async def get_artifact_in_steps(self, flow_id: str, run_id: int, step_name: str):
async def get_artifact_in_steps(self, flow_id: str, run_id: int, step_name: str, limit:int=0, offset:int=0):
run_id_key, run_id_value = translate_run_key(run_id)
filter_dict = {
"flow_id": flow_id,
run_id_key: run_id_value,
"step_name": step_name,
}
return await self.get_records(filter_dict=filter_dict,
ordering=self.ordering)
ordering=self.ordering, limit=limit,offset=offset)

async def get_artifact_in_task(
self, flow_id: str, run_id: int, step_name: str, task_id: int
self, flow_id: str, run_id: int, step_name: str, task_id: int, limit:int=0, offset:int=0
):
run_id_key, run_id_value = translate_run_key(run_id)
task_id_key, task_id_value = translate_task_key(task_id)
Expand All @@ -953,7 +957,7 @@ async def get_artifact_in_task(
task_id_key: task_id_value,
}
return await self.get_records(filter_dict=filter_dict,
ordering=self.ordering)
ordering=self.ordering, limit=limit,offset=offset)

async def get_artifact(
self, flow_id: str, run_id: int, step_name: str, task_id: int, name: str
Expand Down
13 changes: 9 additions & 4 deletions services/metadata_service/api/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
format_response,
handle_exceptions,
http_500,
parse_pagination_params
)
import json

Expand Down Expand Up @@ -216,9 +217,10 @@ async def get_artifacts_by_task(self, request):
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
task_id = request.match_info.get("task_id")
limit, offset = parse_pagination_params(request)

db_response = await self._async_table.get_artifact_in_task(
flow_id, run_number, step_name, task_id
flow_id, run_number, step_name, task_id, limit, offset
)
if db_response.response_code == 200:
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
Expand Down Expand Up @@ -277,9 +279,10 @@ async def get_artifacts_by_task_attempt(self, request):
step_name = request.match_info.get("step_name")
task_id = request.match_info.get("task_id")
attempt_id = request.match_info.get("attempt_id")
limit, offset = parse_pagination_params(request)

db_response = await self._async_table.get_artifact_in_task(
flow_id, run_number, step_name, task_id
flow_id, run_number, step_name, task_id, limit, offset
)
if db_response.response_code == 200:
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
Expand Down Expand Up @@ -333,9 +336,10 @@ async def get_artifacts_by_step(self, request):
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
limit, offset = parse_pagination_params(request)

db_response = await self._async_table.get_artifact_in_steps(
flow_id, run_number, step_name
flow_id, run_number, step_name, limit, offset
)
if db_response.response_code == 200:
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
Expand Down Expand Up @@ -376,8 +380,9 @@ async def get_artifacts_by_run(self, request):
"""
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
limit, offset = parse_pagination_params(request)

db_response = await self._async_table.get_artifacts_in_runs(flow_id, run_number)
db_response = await self._async_table.get_artifacts_in_runs(flow_id, run_number, limit, offset)
if db_response.response_code == 200:
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
filtered_body = filter_artifacts_for_latest_attempt(db_response.body)
Expand Down
16 changes: 14 additions & 2 deletions services/metadata_service/api/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from services.data.postgres_async_db import AsyncPostgresDB
from services.utils import read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
handle_exceptions, parse_pagination_params
import asyncio


Expand Down Expand Up @@ -77,6 +77,16 @@ async def get_flow(self, request):
description: "flow_id"
required: true
type: "string"
- name: "_limit"
in: "query"
description: "Limit for the number of results"
required: false
type: "integer"
- name: "_page"
in: "query"
description: "Page of results to return"
required: false
type: "integer"
produces:
- text/plain
responses:
Expand Down Expand Up @@ -107,4 +117,6 @@ async def get_all_flows(self, request):
"405":
description: invalid HTTP Method
"""
return await self._async_table.get_all_flows()
limit, offset = parse_pagination_params(request)

return await self._async_table.get_all_flows(limit=limit, offset=offset)
10 changes: 7 additions & 3 deletions services/metadata_service/api/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from services.utils import read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
handle_exceptions, parse_pagination_params
import asyncio
from services.data.postgres_async_db import AsyncPostgresDB

Expand Down Expand Up @@ -73,8 +73,10 @@ async def get_metadata(self, request):
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
task_id = request.match_info.get("task_id")
limit, offset = parse_pagination_params(request)

return await self._async_table.get_metadata(
flow_name, run_number, step_name, task_id
flow_name, run_number, step_name, task_id, limit, offset
)

@format_response
Expand Down Expand Up @@ -106,8 +108,10 @@ async def get_metadata_by_run(self, request):
"""
flow_name = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
limit, offset = parse_pagination_params(request)

return await self._async_table.get_metadata_in_runs(
flow_name, run_number
flow_name, run_number, limit, offset
)

async def create_metadata(self, request):
Expand Down
6 changes: 4 additions & 2 deletions services/metadata_service/api/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from services.data.models import RunRow
from services.utils import has_heartbeat_capable_version_tag, read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
handle_exceptions, parse_pagination_params
from services.data.postgres_async_db import AsyncPostgresDB


Expand Down Expand Up @@ -82,7 +82,9 @@ async def get_all_runs(self, request):
description: invalid HTTP Method
"""
flow_name = request.match_info.get("flow_id")
return await self._async_table.get_all_runs(flow_name)
limit, offset = parse_pagination_params(request)

return await self._async_table.get_all_runs(flow_id=flow_name, limit=limit, offset=offset)

@format_response
@handle_exceptions
Expand Down
6 changes: 4 additions & 2 deletions services/metadata_service/api/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from services.data.tagging_utils import apply_run_tags_to_db_response
from services.utils import read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
handle_exceptions, parse_pagination_params
from services.data.postgres_async_db import AsyncPostgresDB


Expand Down Expand Up @@ -55,7 +55,9 @@ async def get_steps(self, request):
"""
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
db_response = await self._async_table.get_steps(flow_id, run_number)
limit, offset = parse_pagination_params(request)

db_response = await self._async_table.get_steps(flow_id=flow_id, run_id=run_number, limit=limit, offset=offset)
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
return db_response

Expand Down
8 changes: 5 additions & 3 deletions services/metadata_service/api/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from services.data.tagging_utils import apply_run_tags_to_db_response
from services.utils import has_heartbeat_capable_version_tag, read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
handle_exceptions, parse_pagination_params
import json
from aiohttp import web
import asyncio
Expand Down Expand Up @@ -77,8 +77,9 @@ async def get_tasks(self, request):
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
limit, offset = parse_pagination_params(request)

db_response = await self._async_table.get_tasks(flow_id, run_number, step_name)
db_response = await self._async_table.get_tasks(flow_id, run_number, step_name, limit, offset)
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
return db_response

Expand Down Expand Up @@ -125,12 +126,13 @@ async def get_filtered_tasks(self, request):
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
limit, offset = parse_pagination_params(request)

# possible filters
metadata_field = request.query.get("metadata_field_name", None)
pattern = request.query.get("pattern", None)

db_response, _ = await self._async_metadata_table.get_filtered_task_pathspecs(flow_id, run_number, step_name, metadata_field, pattern)
db_response, _ = await self._async_metadata_table.get_filtered_task_pathspecs(flow_id, run_number, step_name, metadata_field, pattern, limit, offset)
return db_response

@format_response
Expand Down
8 changes: 8 additions & 0 deletions services/metadata_service/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,11 @@ async def wrapper(*args, **kwargs):
return http_500(str(err))

return wrapper


def parse_pagination_params(request):
page = int(request.query.get("_page", 1))
limit = int(request.query.get("_limit", 0))

offset = limit * (page - 1)
return limit, offset
Loading