Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/core/src/driver/database.driver-persister.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export interface DatabaseDriverPersister {

persist<T>(entity: T): void;

remove<T>(entity: T): void;

flush(): Promise<void>;
}
11 changes: 2 additions & 9 deletions packages/core/src/driver/database.driver.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import { InboxOutboxTransportEvent } from '../model/inbox-outbox-transport-event.interface';
import { DatabaseDriverPersister } from './database.driver-persister';


export interface DatabaseDriver {
export interface DatabaseDriver extends DatabaseDriverPersister {
createInboxOutboxTransportEvent(eventName: string, eventPayload: any, expireAt: number, readyToRetryAfter: number | null): InboxOutboxTransportEvent;

findAndExtendReadyToRetryEvents(limit: number): Promise<InboxOutboxTransportEvent[]>;

persist<T>(entity: T): Promise<void>;

remove<T>(entity: T): Promise<void>;

flush(): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export abstract class InboxOutboxEvent {
/**
* @description Should be unique static name of the event
* @description Should be unique name of the event
*/
name: string;
public abstract readonly name: string;
}
26 changes: 13 additions & 13 deletions packages/core/src/emitter/transactional-event-emitter.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import { Inject, Injectable } from '@nestjs/common';
import { DATABASE_DRIVER_FACTORY_TOKEN, DatabaseDriverFactory } from '../driver/database-driver.factory';
import { DatabaseDriver } from '../driver/database.driver';
import { InboxOutboxModuleEventOptions, InboxOutboxModuleOptions, MODULE_OPTIONS_TOKEN } from '../inbox-outbox.module-definition';
import { IListener } from '../listener/contract/listener.interface';
import { ListenerDuplicateNameException } from '../listener/exception/listener-duplicate-name.exception';
import { INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN, InboxOutboxEventProcessorContract } from '../processor/inbox-outbox-event-processor.contract';
import { EVENT_CONFIGURATION_RESOLVER_TOKEN, EventConfigurationResolverContract } from '../resolver/event-configuration-resolver.contract';
import { InboxOutboxEvent } from './contract/inbox-outbox-event.interface';
import { DatabaseDriverPersister } from '../driver/database.driver-persister';

export enum TransactionalEventEmitterOperations {
persist = 'persist',
Expand All @@ -21,14 +23,15 @@ export class TransactionalEventEmitter {
@Inject(DATABASE_DRIVER_FACTORY_TOKEN) private databaseDriverFactory: DatabaseDriverFactory,
@Inject(INBOX_OUTBOX_EVENT_PROCESSOR_TOKEN) private inboxOutboxEventProcessor: InboxOutboxEventProcessorContract,
@Inject(EVENT_CONFIGURATION_RESOLVER_TOKEN) private eventConfigurationResolver: EventConfigurationResolverContract,
) {}
) {}

async emit(
event: InboxOutboxEvent,
entities: {
operation: TransactionalEventEmitterOperations;
entity: any;
}[],
customDatabaseDriverPersister?: DatabaseDriverPersister,
): Promise<void> {
const eventOptions: InboxOutboxModuleEventOptions = this.options.events.find((optionEvent) => optionEvent.name === event.name);

Expand All @@ -39,24 +42,21 @@ export class TransactionalEventEmitter {
const databaseDriver = this.databaseDriverFactory.create(this.eventConfigurationResolver);
const currentTimestamp = new Date().getTime();

const inboxOutboxTransportEvent = databaseDriver.createInboxOutboxTransportEvent(
event.name,
event,
currentTimestamp + eventOptions.listeners.expiresAtTTL,
currentTimestamp + eventOptions.listeners.readyToRetryAfterTTL,
);
const inboxOutboxTransportEvent = databaseDriver.createInboxOutboxTransportEvent(event.name, event, currentTimestamp + eventOptions.listeners.expiresAtTTL, currentTimestamp + eventOptions.listeners.readyToRetryAfterTTL);

const persister = customDatabaseDriverPersister || databaseDriver;

entities.forEach((entity) => {
if (entity.operation === 'persist') {
databaseDriver.persist(entity.entity);
if (entity.operation === TransactionalEventEmitterOperations.persist) {
persister.persist(entity.entity);
}
if (entity.operation === 'remove') {
databaseDriver.remove(entity.entity);
if (entity.operation === TransactionalEventEmitterOperations.remove) {
persister.remove(entity.entity);
}
});

databaseDriver.persist(inboxOutboxTransportEvent);
await databaseDriver.flush();
persister.persist(inboxOutboxTransportEvent);
await persister.flush();

this.inboxOutboxEventProcessor.process(eventOptions, inboxOutboxTransportEvent, this.getListeners(event.name));
}
Expand Down