Skip to content

Commit 25db268

Browse files
committed
feat(obs): basic agent instrumentation
Adds a new `observability` property to Agents that receives events during the Agent's exeuction that can be used for whatever. The default observability implementation prints the events to console, but in the future this could be hooked up to a UI or some other observability tooling.
1 parent fe9e8d3 commit 25db268

File tree

5 files changed

+279
-3
lines changed

5 files changed

+279
-3
lines changed

.changeset/grumpy-clouds-cross.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"agents": patch
3+
---
4+
5+
Instrumentation for Agents for improved observability

packages/agents/scripts/build.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ async function main() {
99
"src/mcp/index.ts",
1010
"src/mcp/client.ts",
1111
"src/mcp/do-oauth-client-provider.ts",
12+
"src/observability/index.ts",
1213
],
1314
splitting: true,
1415
sourcemap: true,

packages/agents/src/ai-chat-agent.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
7373
// dispatcher,
7474
// duplex
7575
} = data.init;
76+
7677
const { messages } = JSON.parse(body as string);
7778
this.#broadcastChatMessage(
7879
{
@@ -81,8 +82,23 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
8182
},
8283
[connection.id]
8384
);
85+
86+
const incomingMessages = this.#messagesNotAlreadyInAgent(messages);
8487
await this.persistMessages(messages, [connection.id]);
8588

89+
this.observability?.emit(
90+
{
91+
type: "message:request",
92+
id: data.id,
93+
displayMessage: `Incoming chat message:\n${incomingMessages.map((m) => `${m.role}: ${m.content}`).join("\n")}`,
94+
timestamp: Date.now(),
95+
payload: {
96+
message: incomingMessages,
97+
},
98+
},
99+
this.ctx
100+
);
101+
86102
const chatMessageId = data.id;
87103
const abortSignal = this.#getAbortSignal(chatMessageId);
88104

@@ -94,8 +110,23 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
94110
responseMessages: response.messages,
95111
});
96112

113+
const outgoingMessages =
114+
this.#messagesNotAlreadyInAgent(finalMessages);
97115
await this.persistMessages(finalMessages, [connection.id]);
98116
this.#removeAbortController(chatMessageId);
117+
118+
this.observability?.emit(
119+
{
120+
type: "message:response",
121+
id: data.id,
122+
displayMessage: `Outgoing chat message:\n${outgoingMessages.map((m) => `${m.role}: ${m.content}`).join("\n")}`,
123+
timestamp: Date.now(),
124+
payload: {
125+
message: outgoingMessages,
126+
},
127+
},
128+
this.ctx
129+
);
99130
},
100131
abortSignal ? { abortSignal } : undefined
101132
);
@@ -207,6 +238,11 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
207238
);
208239
}
209240

241+
#messagesNotAlreadyInAgent(messages: ChatMessage[]) {
242+
const existingIds = new Set(this.messages.map((message) => message.id));
243+
return messages.filter((message) => !existingIds.has(message.id));
244+
}
245+
210246
async #reply(id: string, response: Response) {
211247
// now take chunks out from dataStreamResponse and send them to the client
212248
return this.#tryCatch(async () => {

packages/agents/src/index.ts

Lines changed: 126 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ import { nanoid } from "nanoid";
1313

1414
import { AsyncLocalStorage } from "node:async_hooks";
1515
import { MCPClientManager } from "./mcp/client";
16+
import { genericObservability, type Observability } from "./observability";
1617

1718
export type { Connection, WSMessage, ConnectionContext } from "partyserver";
1819

20+
export type { ObservabilityEvent, Observability } from "./observability";
21+
1922
/**
2023
* RPC request message from client
2124
*/
@@ -264,6 +267,11 @@ export class Agent<Env, State = unknown> extends Server<Env> {
264267
hibernate: true, // default to hibernate
265268
};
266269

270+
/**
271+
* The observability implementation to use for the Agent
272+
*/
273+
observability?: Observability = genericObservability;
274+
267275
/**
268276
* Execute SQL queries against the Agent's database
269277
* @template T Type of the returned rows
@@ -369,6 +377,35 @@ export class Agent<Env, State = unknown> extends Server<Env> {
369377

370378
// For regular methods, execute and send response
371379
const result = await methodFn.apply(this, args);
380+
381+
const displayArgs = args.map((a) => {
382+
if (typeof a === "object") {
383+
if (Array.isArray(a)) {
384+
return "[...]";
385+
}
386+
387+
return "{...}";
388+
}
389+
390+
return String(a);
391+
});
392+
393+
this.observability?.emit(
394+
{
395+
id: nanoid(),
396+
type: "rpc",
397+
displayMessage: `RPC call to ${method} args: ${displayArgs.join(", ")}`,
398+
timestamp: Date.now(),
399+
payload: {
400+
method,
401+
args,
402+
success: true,
403+
streaming: metadata?.streaming,
404+
},
405+
},
406+
this.ctx
407+
);
408+
372409
const response: RPCResponse = {
373410
type: "rpc",
374411
id,
@@ -413,6 +450,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
413450
})
414451
);
415452
}
453+
454+
this.observability?.emit(
455+
{
456+
id: nanoid(),
457+
type: "connect",
458+
displayMessage: `Connection ${connection.id} established`,
459+
timestamp: Date.now(),
460+
payload: {
461+
connectionId: connection.id,
462+
},
463+
},
464+
this.ctx
465+
);
416466
return this.#tryCatch(() => _onConnect(connection, ctx));
417467
}, 20);
418468
}
@@ -421,6 +471,7 @@ export class Agent<Env, State = unknown> extends Server<Env> {
421471
}
422472

423473
#setStateInternal(state: State, source: Connection | "server" = "server") {
474+
const previousState = this.#state;
424475
this.#state = state;
425476
this.sql`
426477
INSERT OR REPLACE INTO cf_agents_state (id, state)
@@ -442,6 +493,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
442493
return agentContext.run(
443494
{ agent: this, connection, request },
444495
async () => {
496+
this.observability?.emit(
497+
{
498+
id: nanoid(),
499+
type: "state:update",
500+
displayMessage: "State updated",
501+
timestamp: Date.now(),
502+
payload: {
503+
state,
504+
previousState,
505+
},
506+
},
507+
this.ctx
508+
);
445509
return this.onStateUpdate(state, source);
446510
}
447511
);
@@ -535,6 +599,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
535599
): Promise<Schedule<T>> {
536600
const id = nanoid(9);
537601

602+
const emitScheduleCreate = (schedule: Schedule<T>) =>
603+
this.observability?.emit(
604+
{
605+
id: nanoid(),
606+
type: "schedule:create",
607+
displayMessage: `Schedule ${schedule.id} created`,
608+
timestamp: Date.now(),
609+
payload: schedule,
610+
},
611+
this.ctx
612+
);
613+
538614
if (typeof callback !== "string") {
539615
throw new Error("Callback must be a string");
540616
}
@@ -554,13 +630,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
554630

555631
await this.#scheduleNextAlarm();
556632

557-
return {
633+
const schedule: Schedule<T> = {
558634
id,
559635
callback: callback,
560636
payload: payload as T,
561637
time: timestamp,
562638
type: "scheduled",
563639
};
640+
641+
emitScheduleCreate(schedule);
642+
643+
return schedule;
564644
}
565645
if (typeof when === "number") {
566646
const time = new Date(Date.now() + when * 1000);
@@ -575,14 +655,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
575655

576656
await this.#scheduleNextAlarm();
577657

578-
return {
658+
const schedule: Schedule<T> = {
579659
id,
580660
callback: callback,
581661
payload: payload as T,
582662
delayInSeconds: when,
583663
time: timestamp,
584664
type: "delayed",
585665
};
666+
667+
emitScheduleCreate(schedule);
668+
669+
return schedule;
586670
}
587671
if (typeof when === "string") {
588672
const nextExecutionTime = getNextCronTime(when);
@@ -597,14 +681,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
597681

598682
await this.#scheduleNextAlarm();
599683

600-
return {
684+
const schedule: Schedule<T> = {
601685
id,
602686
callback: callback,
603687
payload: payload as T,
604688
cron: when,
605689
time: timestamp,
606690
type: "cron",
607691
};
692+
693+
emitScheduleCreate(schedule);
694+
695+
return schedule;
608696
}
609697
throw new Error("Invalid schedule type");
610698
}
@@ -680,6 +768,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
680768
* @returns true if the task was cancelled, false otherwise
681769
*/
682770
async cancelSchedule(id: string): Promise<boolean> {
771+
const schedule = await this.getSchedule(id);
772+
if (schedule) {
773+
this.observability?.emit(
774+
{
775+
id: nanoid(),
776+
type: "schedule:delete",
777+
displayMessage: `Schedule ${id} deleted`,
778+
timestamp: Date.now(),
779+
payload: schedule,
780+
},
781+
this.ctx
782+
);
783+
}
683784
this.sql`DELETE FROM cf_agents_schedules WHERE id = ${id}`;
684785

685786
await this.#scheduleNextAlarm();
@@ -724,6 +825,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
724825
{ agent: this, connection: undefined, request: undefined },
725826
async () => {
726827
try {
828+
this.observability?.emit(
829+
{
830+
id: nanoid(),
831+
type: "schedule:execute",
832+
displayMessage: `Schedule ${row.id} executed`,
833+
timestamp: Date.now(),
834+
payload: row,
835+
},
836+
this.ctx
837+
);
838+
727839
await (
728840
callback as (
729841
payload: unknown,
@@ -766,6 +878,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
766878
// delete all alarms
767879
await this.ctx.storage.deleteAlarm();
768880
await this.ctx.storage.deleteAll();
881+
882+
this.observability?.emit(
883+
{
884+
id: nanoid(),
885+
type: "destroy",
886+
displayMessage: "Agent destroyed",
887+
timestamp: Date.now(),
888+
payload: {},
889+
},
890+
this.ctx
891+
);
769892
}
770893

771894
/**

0 commit comments

Comments
 (0)