Skip to content

Commit 1ffb3d6

Browse files
authored
feat(server): add kubernetes events to provider logs (#748)
Signed-off-by: Radek Ježek <[email protected]>
1 parent 9420186 commit 1ffb3d6

File tree

5 files changed

+74
-19
lines changed

5 files changed

+74
-19
lines changed

apps/beeai-server/src/beeai_server/api/routes/provider.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from uuid import UUID
2121

2222
from beeai_server.domain.models.provider import ProviderWithState
23-
from beeai_server.api.routes.dependencies import ProviderServiceDependency
23+
from beeai_server.api.routes.dependencies import ProviderServiceDependency, AdminAuthDependency
2424
from beeai_server.api.schema.common import PaginatedResponse
2525
from starlette.responses import StreamingResponse
2626

@@ -31,6 +31,7 @@
3131

3232
@router.post("")
3333
async def create_provider(
34+
_: AdminAuthDependency,
3435
request: CreateProviderRequest,
3536
provider_service: ProviderServiceDependency,
3637
auto_remove: bool = Query(default=False),
@@ -42,7 +43,9 @@ async def create_provider(
4243

4344
@router.post("/register/unmanaged", deprecated=True)
4445
async def deprecated_create_unmanaged_provider(
45-
request: CreateProviderRequest, provider_service: ProviderServiceDependency
46+
_: AdminAuthDependency,
47+
request: CreateProviderRequest,
48+
provider_service: ProviderServiceDependency,
4649
) -> ProviderWithState:
4750
"""Backward compatibility for ACP sdk."""
4851
return await provider_service.create_provider(location=request.location, auto_remove=True)
@@ -67,11 +70,19 @@ async def get_provider(id: UUID, provider_service: ProviderServiceDependency) ->
6770

6871

6972
@router.delete("/{id}", status_code=fastapi.status.HTTP_204_NO_CONTENT)
70-
async def delete_provider(id: UUID, provider_service: ProviderServiceDependency) -> None:
73+
async def delete_provider(
74+
_: AdminAuthDependency,
75+
id: UUID,
76+
provider_service: ProviderServiceDependency,
77+
) -> None:
7178
await provider_service.delete_provider(provider_id=id)
7279

7380

7481
@router.get("/{id}/logs", status_code=fastapi.status.HTTP_204_NO_CONTENT)
75-
async def stream_logs(id: UUID, provider_service: ProviderServiceDependency) -> StreamingResponse:
82+
async def stream_logs(
83+
_: AdminAuthDependency,
84+
id: UUID,
85+
provider_service: ProviderServiceDependency,
86+
) -> StreamingResponse:
7687
logs_iterator = await provider_service.stream_logs(provider_id=id)
7788
return streaming_response(logs_iterator())

apps/beeai-server/src/beeai_server/infrastructure/kubernetes/provider_deployment_manager.py

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from asyncio import TaskGroup
2222
from contextlib import asynccontextmanager
2323
from datetime import timedelta
24-
from typing import Callable, Awaitable
24+
from typing import Callable, Awaitable, AsyncIterator
2525
from uuid import UUID
2626

2727
import kr8s
@@ -40,11 +40,11 @@
4040

4141

4242
class KubernetesProviderDeploymentManager(IProviderDeploymentManager):
43-
def __init__(self, api_factory: Callable[[], Awaitable[kr8s.Api]]):
43+
def __init__(self, api_factory: Callable[[], Awaitable[kr8s.asyncio.Api]]):
4444
self._api_factory = api_factory
4545

4646
@asynccontextmanager
47-
async def api(self):
47+
async def api(self) -> AsyncIterator[kr8s.asyncio.Api]:
4848
client = await self._api_factory()
4949
yield client
5050

@@ -240,23 +240,57 @@ async def get_provider_url(self, *, provider_id: UUID) -> HttpUrl:
240240
async def stream_logs(self, *, provider_id: UUID, logs_container: LogsContainer):
241241
try:
242242
async with self.api() as api:
243+
missing_logged = False
243244
while True:
244-
[state] = await self.state(provider_ids=[provider_id])
245-
if state == ProviderDeploymentState.running:
246-
break
247-
logs_container.add_stdout("Waiting for provider startup...")
245+
try:
246+
deploy = await Deployment.get(name=self._get_k8s_name(provider_id, kind="deploy"), api=api)
247+
if pods := await deploy.pods():
248+
break
249+
except kr8s.NotFoundError:
250+
...
251+
if not missing_logged:
252+
logs_container.add_stdout("Provider is not running, run a query to start it up...")
253+
missing_logged = True
248254
await asyncio.sleep(1)
249255

256+
if deploy.status.get("availableReplicas", 0) == 0:
257+
async for event_stream_type, event in api.watch(
258+
kind="event",
259+
# TODO: we select for only one pod, for multi-pod agents this might hold up the logs for a while
260+
field_selector=f"involvedObject.name=={pods[0].name},involvedObject.kind==Pod",
261+
):
262+
message = event.raw.get("message", "")
263+
logs_container.add_stdout(f"{event.raw.reason}: {message}")
264+
if event.raw.reason == "Started":
265+
break
266+
267+
for attempt in range(10):
268+
try:
269+
_ = [log async for log in pods[0].logs(tail_lines=1)]
270+
break
271+
except kr8s.ServerError:
272+
await asyncio.sleep(1)
273+
else:
274+
logs_container.add_stdout("Container crashed or not starting up, attempting to get previous logs:")
275+
with suppress(kr8s.ServerError):
276+
previous_logs = [log async for log in pods[0].logs(previous=True)]
277+
if previous_logs:
278+
logs_container.add_stdout("Previous container logs:")
279+
for log in previous_logs:
280+
logs_container.add_stdout(f"Previous: {log}")
281+
return
282+
283+
# Stream logs from pods
250284
async def stream_logs(pod: Pod):
251285
async for line in pod.logs(follow=True):
252286
logs_container.add_stdout(
253287
f"{pod.name.replace(self._get_k8s_name(provider_id, 'deploy'), '')}: {line}"
254288
)
255289

256290
async with TaskGroup() as tg:
257-
deploy = await Deployment.get(name=self._get_k8s_name(provider_id, kind="deploy"), api=api)
258291
for pod in await deploy.pods():
259292
tg.create_task(stream_logs(pod))
293+
260294
except Exception as ex:
261295
logs_container.add(
262296
ProcessLogMessage(stream=ProcessLogType.stderr, message=extract_messages(ex), error=True)

apps/beeai-server/src/beeai_server/utils/logs_container.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def _handle_message(log: ProcessLogMessage):
9191
try:
9292
stream_send.send_nowait(log)
9393
except WouldBlock:
94-
logger.error("Unable to stream logs to client due to a full buffer")
94+
logger.debug("Unable to stream logs to client due to a full buffer")
9595

9696
if include_old:
9797
for message in self.logs:

apps/beeai-server/tasks.toml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,18 +102,22 @@ depends = ["beeai-server:image:build"]
102102
run = """
103103
#!/bin/bash
104104
105-
NAMESPACE=beeai
105+
NAMESPACE=default
106106
VM_NAME='{{option(name="vm-name", default="beeai-local-dev")}}'
107107
KUBECONFIG="${HOME}/.beeai/lima/${VM_NAME}/copied-from-guest/kubeconfig.yaml"
108+
LIMA_HOME=~/.beeai/lima
109+
110+
export KUBECONFIG
111+
export LIMA_HOME
108112
109113
[[ ! -f .env ]] && cp template.env .env
110114
111115
tele="telepresence --use .*${NAMESPACE}.*"
112-
($tele list -n beeai --replacements 2>/dev/null | grep -q platform) && echo "Dev environment already running" && exit
116+
($tele list --replacements 2>/dev/null | grep -q platform) && echo "Dev environment already running" && exit
113117
118+
# Stopping all platform deployments
114119
mise run beeai-server:dev:stop
115120
116-
# Stopping all platform deployments
117121
if [ '{{flag(name='no-registry')}}' = 'true' ]; then
118122
extra_flags="--set externalRegistries=null"
119123
fi
@@ -122,7 +126,6 @@ mise run beeai-cli:run -- platform start \
122126
--vm-name=${VM_NAME} \
123127
--import ghcr.io/i-am-bee/beeai-platform/beeai-server:local \
124128
--set image.tag=local $extra_flags
125-
export KUBECONFIG
126129
127130
$tele helm install
128131
$tele connect --namespace "$NAMESPACE"
@@ -171,7 +174,9 @@ deactivate () {
171174
dir = "{{config_root}}/apps/beeai-server"
172175
run = """
173176
#!/bin/bash
174-
NAMESPACE=beeai
177+
NAMESPACE=default
178+
179+
telepresence --use ".*${NAMESPACE}.*" uninstall --all-agents || true
175180
176181
# Stop all lima VMs
177182
{% raw %}

helm/beeai-platform/templates/serviceaccount/role.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,9 @@ rules:
2828
# Permissions for pod logs
2929
- apiGroups: [ "" ]
3030
resources: [ "pods/log" ]
31-
verbs: [ "get" ]
31+
verbs: [ "get" ]
32+
33+
# Permissions for events (read-only)
34+
- apiGroups: [ "" ]
35+
resources: [ "events" ]
36+
verbs: [ "get", "list", "watch" ]

0 commit comments

Comments
 (0)