Skip to content

chore(root): Release 2025-07-09 13:34 #8665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: prod
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/api/src/app/inbox/inbox.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
UserSessionData,
} from '@novu/shared';

import { SubscriberDto, SubscriberSessionRequestDto } from './dtos/subscriber-session-request.dto';
import { SubscriberSessionRequestDto } from './dtos/subscriber-session-request.dto';
import { SubscriberSessionResponseDto } from './dtos/subscriber-session-response.dto';
import { SessionCommand } from './usecases/session/session.command';
import { Session } from './usecases/session/session.usecase';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
buildFeedKey,
buildMessageCountKey,
InvalidateCacheService,
PinoLogger,
TraceLogRepository,
WebSocketsQueueService,
} from '@novu/application-generic';

Expand Down Expand Up @@ -37,18 +39,24 @@ describe('MarkManyNotificationsAs', () => {
let webSocketsQueueServiceMock: sinon.SinonStubbedInstance<WebSocketsQueueService>;
let getSubscriberMock: sinon.SinonStubbedInstance<GetSubscriber>;
let messageRepositoryMock: sinon.SinonStubbedInstance<MessageRepository>;
let traceLogRepositoryMock: sinon.SinonStubbedInstance<TraceLogRepository>;
let loggerMock: sinon.SinonStubbedInstance<PinoLogger>;

beforeEach(() => {
invalidateCacheMock = sinon.createStubInstance(InvalidateCacheService);
webSocketsQueueServiceMock = sinon.createStubInstance(WebSocketsQueueService);
getSubscriberMock = sinon.createStubInstance(GetSubscriber);
messageRepositoryMock = sinon.createStubInstance(MessageRepository);
traceLogRepositoryMock = sinon.createStubInstance(TraceLogRepository);
loggerMock = sinon.createStubInstance(PinoLogger);

markManyNotificationsAs = new MarkManyNotificationsAs(
invalidateCacheMock as any,
webSocketsQueueServiceMock as any,
getSubscriberMock as any,
messageRepositoryMock as any
messageRepositoryMock as any,
traceLogRepositoryMock as any,
loggerMock as any
);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ import {
buildMessageCountKey,
InvalidateCacheService,
WebSocketsQueueService,
TraceLogRepository,
PinoLogger,
mapEventTypeToTitle,
LogRepository,
Trace,
EventType,
} from '@novu/application-generic';
import { MessageRepository } from '@novu/dal';
import { MessageEntity, MessageRepository } from '@novu/dal';
import { WebSocketEventEnum } from '@novu/shared';

import { GetSubscriber } from '../../../subscribers/usecases/get-subscriber';
Expand All @@ -17,8 +23,12 @@ export class MarkManyNotificationsAs {
private invalidateCacheService: InvalidateCacheService,
private webSocketsQueueService: WebSocketsQueueService,
private getSubscriber: GetSubscriber,
private messageRepository: MessageRepository
) {}
private messageRepository: MessageRepository,
private traceLogRepository: TraceLogRepository,
private logger: PinoLogger
) {
this.logger.setContext(this.constructor.name);
}

async execute(command: MarkManyNotificationsAsCommand): Promise<void> {
const subscriber = await this.getSubscriber.execute({
Expand All @@ -39,6 +49,12 @@ export class MarkManyNotificationsAs {
snoozedUntil: command.snoozedUntil,
});

await this.logTraces({
command,
subscriberId: subscriber.subscriberId,
_subscriberId: subscriber._id,
});

await this.invalidateCacheService.invalidateQuery({
key: buildFeedKey().invalidate({
subscriberId: subscriber.subscriberId,
Expand All @@ -63,4 +79,104 @@ export class MarkManyNotificationsAs {
groupId: subscriber._organizationId,
});
}

private async logTraces({
command,
subscriberId,
_subscriberId,
}: {
command: MarkManyNotificationsAsCommand;
subscriberId: string;
_subscriberId: string;
}): Promise<void> {
const messages = await this.messageRepository.find({
_environmentId: command.environmentId,
_subscriberId,
_id: { $in: command.ids },
});

if (!messages || !Array.isArray(messages)) {
return;
}

const allTraceData: Omit<Trace, 'id' | 'expires_at'>[] = [];

for (const message of messages) {
if (!message._jobId) continue;

if (command.read !== undefined) {
allTraceData.push(
createTraceLog({
message,
command,
eventType: command.read ? 'message_read' : 'message_unread',
subscriberId,
_subscriberId,
})
);
}

if (command.snoozedUntil !== undefined) {
allTraceData.push(
createTraceLog({
message,
command,
eventType: 'message_snoozed',
subscriberId,
_subscriberId,
})
);
}

if (command.archived !== undefined) {
allTraceData.push(
createTraceLog({
message,
command,
eventType: command.archived ? 'message_archived' : 'message_unarchived',
subscriberId,
_subscriberId,
})
);
}
}

if (allTraceData.length > 0) {
try {
await this.traceLogRepository.createMany(allTraceData);
} catch (error) {
this.logger.warn({ err: error }, `Failed to create engagement traces for ${allTraceData.length} messages`);
}
}
}
}

function createTraceLog({
message,
command,
eventType,
subscriberId,
_subscriberId,
}: {
message: MessageEntity;
command: MarkManyNotificationsAsCommand;
eventType: EventType;
subscriberId: string;
_subscriberId: string;
}): Omit<Trace, 'id' | 'expires_at'> {
return {
created_at: LogRepository.formatDateTime64(new Date()),
organization_id: message._organizationId,
environment_id: message._environmentId,
user_id: command.subscriberId,
subscriber_id: _subscriberId,
external_subscriber_id: subscriberId,
event_type: eventType,
title: mapEventTypeToTitle(eventType),
message: `Message ${eventType.replace('message_', '')} for subscriber ${message._subscriberId}`,
raw_data: null,
status: 'success',
entity_type: 'step_run',
entity_id: message._jobId,
};
}
2 changes: 1 addition & 1 deletion apps/api/src/app/logs/dtos/get-requests.response.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export type RequestLogResponseDto = {
userId: string;
organizationId: string;
environmentId: string;
schemaType: string;
authType: string;
durationMs: number;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class GetRequests {
if (command.hoursAgo) {
where.created_at = {
operator: '>=',
value: subHours(new Date(), command.hoursAgo).toISOString().slice(0, 19).replace('T', ' ') as any,
value: subHours(new Date(), command.hoursAgo).toISOString().slice(0, -1) as any,
};
}

Expand Down Expand Up @@ -67,7 +67,7 @@ export class GetRequests {
userId: log.user_id,
organizationId: log.organization_id,
environmentId: log.environment_id,
schemaType: log.schema_type,
authType: log.auth_type,
durationMs: log.duration_ms,
}));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Injectable, NotFoundException } from '@nestjs/common';
import { NotificationRepository } from '@novu/dal';
import { AnalyticsService } from '@novu/application-generic';
import { NotificationRepository, ExecutionDetailFeedItem } from '@novu/dal';
import { AnalyticsService, TraceLogRepository, PinoLogger, FeatureFlagsService } from '@novu/application-generic';
import { ExecutionDetailsSourceEnum, ExecutionDetailsStatusEnum, FeatureFlagsKeysEnum } from '@novu/shared';

import { ActivityNotificationResponseDto } from '../../dtos/activities-response.dto';
import { GetActivityCommand } from './get-activity.command';
Expand All @@ -10,19 +11,36 @@ import { mapFeedItemToDto } from '../get-activity-feed/map-feed-item-to.dto';
export class GetActivity {
constructor(
private notificationRepository: NotificationRepository,
private analyticsService: AnalyticsService
private analyticsService: AnalyticsService,
private traceLogRepository: TraceLogRepository,
private logger: PinoLogger,
private featureFlagsService: FeatureFlagsService
) {}

async execute(command: GetActivityCommand): Promise<ActivityNotificationResponseDto> {
this.analyticsService.track('Get Activity Feed Item - [Activity Feed]', command.userId, {
_organization: command.organizationId,
});

const feedItem = await this.notificationRepository.getFeedItem(
command.notificationId,
command.environmentId,
command.organizationId
);
const tracesEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_TRACE_LOGS_ENABLED,
defaultValue: false,
organization: { _id: command.organizationId },
user: { _id: command.userId },
environment: { _id: command.environmentId },
});

let feedItem;

if (tracesEnabled) {
feedItem = await this.getFeedItemFromTraceLog(command);
} else {
feedItem = await this.notificationRepository.getFeedItem(
command.notificationId,
command.environmentId,
command.organizationId
);
}

if (!feedItem) {
throw new NotFoundException('Notification not found', {
Expand All @@ -32,4 +50,102 @@ export class GetActivity {

return mapFeedItemToDto(feedItem);
}

private mapTraceStatusToExecutionStatus(traceStatus: string): ExecutionDetailsStatusEnum {
switch (traceStatus.toLowerCase()) {
case 'success':
return ExecutionDetailsStatusEnum.SUCCESS;
case 'error':
case 'failed':
return ExecutionDetailsStatusEnum.FAILED;
case 'warning':
return ExecutionDetailsStatusEnum.WARNING;
case 'pending':
return ExecutionDetailsStatusEnum.PENDING;
case 'queued':
return ExecutionDetailsStatusEnum.QUEUED;
default:
return ExecutionDetailsStatusEnum.PENDING;
}
}

private async getFeedItemFromTraceLog(command: GetActivityCommand) {
try {
const feedItem = await this.notificationRepository.findMetadataForTraces(
command.notificationId,
command.environmentId,
command.organizationId
);

if (!feedItem) {
return null;
}

const jobIds = feedItem.jobs.map((job) => job._id);

if (jobIds.length === 0) {
return feedItem;
}

const traceResult = await this.traceLogRepository.find({
where: {
entity_id: { operator: 'IN', value: jobIds },
entity_type: 'step_run',
environment_id: command.environmentId,
organization_id: command.organizationId,
},
limit: 1000,
orderBy: 'created_at',
orderDirection: 'ASC',
});

const traceLogsByJobId = new Map<string, typeof traceResult.data>();
for (const trace of traceResult.data) {
if (!traceLogsByJobId.has(trace.entity_id)) {
traceLogsByJobId.set(trace.entity_id, []);
}
traceLogsByJobId.get(trace.entity_id)!.push(trace);
}

feedItem.jobs = feedItem.jobs.map((job) => {
const traces = traceLogsByJobId.get(job._id) || [];
const executionDetails: ExecutionDetailFeedItem[] = traces.map((trace) => ({
_id: trace.id,
providerId: undefined,
detail: trace.title,
source: ExecutionDetailsSourceEnum.INTERNAL,
_jobId: job._id,
status: this.mapTraceStatusToExecutionStatus(trace.status),
isTest: false,
isRetry: false,
createdAt: new Date(trace.created_at).toISOString(),
raw: trace.raw_data,
}));

return {
...job,
executionDetails,
};
});

return feedItem;
} catch (error) {
this.logger.error(
{
error: error instanceof Error ? error.message : 'Unknown error',
notificationId: command.notificationId,
environmentId: command.environmentId,
organizationId: command.organizationId,
},
'Failed to get feed item from trace log'
);

// Fall back to the old method if trace log query fails
return await this.notificationRepository.getFeedItem(
command.notificationId,
command.environmentId,
command.organizationId
);
}
}
}
Loading
Loading