Skip to content

Commit fc015af

Browse files
committed
feat(obs): basic agent instrumentation
Adds a new `observability` property to Agents that receives events during the Agent's exeuction that can be used for whatever. The default observability implementation prints the events to console, but in the future this could be hooked up to a UI or some other observability tooling.
1 parent fe9e8d3 commit fc015af

File tree

5 files changed

+277
-3
lines changed

5 files changed

+277
-3
lines changed

packages/agents/package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@
6666
"types": "./dist/mcp/do-oauth-client-provider.d.ts",
6767
"require": "./dist/mcp/do-oauth-client-provider.js",
6868
"import": "./dist/mcp/do-oauth-client-provider.js"
69+
},
70+
"./observability": {
71+
"types": "./dist/observability/index.d.ts",
72+
"require": "./dist/observability/index.js",
73+
"import": "./dist/observability/index.js"
6974
}
7075
},
7176
"keywords": [],

packages/agents/scripts/build.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ async function main() {
99
"src/mcp/index.ts",
1010
"src/mcp/client.ts",
1111
"src/mcp/do-oauth-client-provider.ts",
12+
"src/observability/index.ts",
1213
],
1314
splitting: true,
1415
sourcemap: true,

packages/agents/src/ai-chat-agent.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
7373
// dispatcher,
7474
// duplex
7575
} = data.init;
76+
7677
const { messages } = JSON.parse(body as string);
7778
this.#broadcastChatMessage(
7879
{
@@ -81,8 +82,23 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
8182
},
8283
[connection.id]
8384
);
85+
86+
const incomingMessages = this.#messagesNotAlreadyInAgent(messages);
8487
await this.persistMessages(messages, [connection.id]);
8588

89+
this.observability?.emit(
90+
{
91+
type: "message:request",
92+
id: data.id,
93+
displayMessage: `Incoming chat message:\n${incomingMessages.map((m) => `${m.role}: ${m.content}`).join("\n")}`,
94+
timestamp: Date.now(),
95+
payload: {
96+
message: incomingMessages,
97+
},
98+
},
99+
this.ctx
100+
);
101+
86102
const chatMessageId = data.id;
87103
const abortSignal = this.#getAbortSignal(chatMessageId);
88104

@@ -94,8 +110,23 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
94110
responseMessages: response.messages,
95111
});
96112

113+
const outgoingMessages =
114+
this.#messagesNotAlreadyInAgent(finalMessages);
97115
await this.persistMessages(finalMessages, [connection.id]);
98116
this.#removeAbortController(chatMessageId);
117+
118+
this.observability?.emit(
119+
{
120+
type: "message:response",
121+
id: data.id,
122+
displayMessage: `Outgoing chat message:\n${outgoingMessages.map((m) => `${m.role}: ${m.content}`).join("\n")}`,
123+
timestamp: Date.now(),
124+
payload: {
125+
message: outgoingMessages,
126+
},
127+
},
128+
this.ctx
129+
);
99130
},
100131
abortSignal ? { abortSignal } : undefined
101132
);
@@ -207,6 +238,11 @@ export class AIChatAgent<Env = unknown, State = unknown> extends Agent<
207238
);
208239
}
209240

241+
#messagesNotAlreadyInAgent(messages: ChatMessage[]) {
242+
const existingIds = new Set(this.messages.map((message) => message.id));
243+
return messages.filter((message) => !existingIds.has(message.id));
244+
}
245+
210246
async #reply(id: string, response: Response) {
211247
// now take chunks out from dataStreamResponse and send them to the client
212248
return this.#tryCatch(async () => {

packages/agents/src/index.ts

Lines changed: 124 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { nanoid } from "nanoid";
1313

1414
import { AsyncLocalStorage } from "node:async_hooks";
1515
import { MCPClientManager } from "./mcp/client";
16+
import { genericObservability, type Observability } from "./observability";
1617

1718
export type { Connection, WSMessage, ConnectionContext } from "partyserver";
1819

@@ -264,6 +265,11 @@ export class Agent<Env, State = unknown> extends Server<Env> {
264265
hibernate: true, // default to hibernate
265266
};
266267

268+
/**
269+
* The observability implementation to use for the Agent
270+
*/
271+
observability?: Observability = genericObservability;
272+
267273
/**
268274
* Execute SQL queries against the Agent's database
269275
* @template T Type of the returned rows
@@ -369,6 +375,35 @@ export class Agent<Env, State = unknown> extends Server<Env> {
369375

370376
// For regular methods, execute and send response
371377
const result = await methodFn.apply(this, args);
378+
379+
const displayArgs = args.map(a =>{
380+
if (typeof a === "object") {
381+
if (Array.isArray(a)) {
382+
return "[...]"
383+
}
384+
385+
return "{...}";
386+
}
387+
388+
return String(a);
389+
});
390+
391+
this.observability?.emit(
392+
{
393+
id: nanoid(),
394+
type: "rpc",
395+
displayMessage: `RPC call to ${method} args: ${displayArgs.join(", ")}`,
396+
timestamp: Date.now(),
397+
payload: {
398+
method,
399+
args,
400+
success: true,
401+
streaming: metadata?.streaming,
402+
},
403+
},
404+
this.ctx
405+
);
406+
372407
const response: RPCResponse = {
373408
type: "rpc",
374409
id,
@@ -413,6 +448,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
413448
})
414449
);
415450
}
451+
452+
this.observability?.emit(
453+
{
454+
id: nanoid(),
455+
type: "connect",
456+
displayMessage: `Connection ${connection.id} established`,
457+
timestamp: Date.now(),
458+
payload: {
459+
connectionId: connection.id,
460+
},
461+
},
462+
this.ctx
463+
);
416464
return this.#tryCatch(() => _onConnect(connection, ctx));
417465
}, 20);
418466
}
@@ -421,6 +469,7 @@ export class Agent<Env, State = unknown> extends Server<Env> {
421469
}
422470

423471
#setStateInternal(state: State, source: Connection | "server" = "server") {
472+
const previousState = this.#state;
424473
this.#state = state;
425474
this.sql`
426475
INSERT OR REPLACE INTO cf_agents_state (id, state)
@@ -442,6 +491,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
442491
return agentContext.run(
443492
{ agent: this, connection, request },
444493
async () => {
494+
this.observability?.emit(
495+
{
496+
id: nanoid(),
497+
type: "state:update",
498+
displayMessage: "State updated",
499+
timestamp: Date.now(),
500+
payload: {
501+
state,
502+
previousState,
503+
},
504+
},
505+
this.ctx
506+
);
445507
return this.onStateUpdate(state, source);
446508
}
447509
);
@@ -535,6 +597,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
535597
): Promise<Schedule<T>> {
536598
const id = nanoid(9);
537599

600+
const emitScheduleCreate = (schedule: Schedule<T>) =>
601+
this.observability?.emit(
602+
{
603+
id: nanoid(),
604+
type: "schedule:create",
605+
displayMessage: `Schedule ${schedule.id} created`,
606+
timestamp: Date.now(),
607+
payload: schedule,
608+
},
609+
this.ctx
610+
);
611+
538612
if (typeof callback !== "string") {
539613
throw new Error("Callback must be a string");
540614
}
@@ -554,13 +628,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
554628

555629
await this.#scheduleNextAlarm();
556630

557-
return {
631+
const schedule: Schedule<T> = {
558632
id,
559633
callback: callback,
560634
payload: payload as T,
561635
time: timestamp,
562636
type: "scheduled",
563637
};
638+
639+
emitScheduleCreate(schedule);
640+
641+
return schedule;
564642
}
565643
if (typeof when === "number") {
566644
const time = new Date(Date.now() + when * 1000);
@@ -575,14 +653,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
575653

576654
await this.#scheduleNextAlarm();
577655

578-
return {
656+
const schedule: Schedule<T> = {
579657
id,
580658
callback: callback,
581659
payload: payload as T,
582660
delayInSeconds: when,
583661
time: timestamp,
584662
type: "delayed",
585663
};
664+
665+
emitScheduleCreate(schedule);
666+
667+
return schedule;
586668
}
587669
if (typeof when === "string") {
588670
const nextExecutionTime = getNextCronTime(when);
@@ -597,14 +679,18 @@ export class Agent<Env, State = unknown> extends Server<Env> {
597679

598680
await this.#scheduleNextAlarm();
599681

600-
return {
682+
const schedule: Schedule<T> = {
601683
id,
602684
callback: callback,
603685
payload: payload as T,
604686
cron: when,
605687
time: timestamp,
606688
type: "cron",
607689
};
690+
691+
emitScheduleCreate(schedule);
692+
693+
return schedule;
608694
}
609695
throw new Error("Invalid schedule type");
610696
}
@@ -680,6 +766,19 @@ export class Agent<Env, State = unknown> extends Server<Env> {
680766
* @returns true if the task was cancelled, false otherwise
681767
*/
682768
async cancelSchedule(id: string): Promise<boolean> {
769+
const schedule = await this.getSchedule(id);
770+
if (schedule) {
771+
this.observability?.emit(
772+
{
773+
id: nanoid(),
774+
type: "schedule:delete",
775+
displayMessage: `Schedule ${id} deleted`,
776+
timestamp: Date.now(),
777+
payload: schedule,
778+
},
779+
this.ctx
780+
);
781+
}
683782
this.sql`DELETE FROM cf_agents_schedules WHERE id = ${id}`;
684783

685784
await this.#scheduleNextAlarm();
@@ -724,6 +823,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
724823
{ agent: this, connection: undefined, request: undefined },
725824
async () => {
726825
try {
826+
this.observability?.emit(
827+
{
828+
id: nanoid(),
829+
type: "schedule:execute",
830+
displayMessage: `Schedule ${row.id} executed`,
831+
timestamp: Date.now(),
832+
payload: row,
833+
},
834+
this.ctx
835+
);
836+
727837
await (
728838
callback as (
729839
payload: unknown,
@@ -766,6 +876,17 @@ export class Agent<Env, State = unknown> extends Server<Env> {
766876
// delete all alarms
767877
await this.ctx.storage.deleteAlarm();
768878
await this.ctx.storage.deleteAll();
879+
880+
this.observability?.emit(
881+
{
882+
id: nanoid(),
883+
type: "destroy",
884+
displayMessage: "Agent destroyed",
885+
timestamp: Date.now(),
886+
payload: {},
887+
},
888+
this.ctx
889+
);
769890
}
770891

771892
/**

0 commit comments

Comments
 (0)