Skip to content
This repository was archived by the owner on Aug 18, 2020. It is now read-only.

Commit 92676e1

Browse files
committed
Implement possibility to get URL / path of current request and write it to message
1 parent 8232e66 commit 92676e1

File tree

5 files changed

+11
-1
lines changed

5 files changed

+11
-1
lines changed

index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import * as merge from "lodash/merge";
22

33
import ConfigInterface from "./lib/interfaces/ConfigInterface";
4+
import ProducerMessageInterface from "./lib/interfaces/ProducerMessageInterface";
45

56
import Connector from "./lib/Connector";
67

@@ -35,6 +36,8 @@ const defaultOptions = {
3536
"retry.backoff.ms": 200,
3637
"socket.keepalive.enable": true,
3738
"workerPerPartition": 1,
39+
40+
"getPath": (message: ProducerMessageInterface): string => message.url,
3841
};
3942

4043
export default (options: ConfigInterface) => {

lib/Consumer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ export default class Consumer extends EventEmitter {
9393

9494
await this.publish(message.key, {
9595
content,
96+
url: messageContent.url,
9697
});
9798
} catch (err) {
9899
this.handleError(err);

lib/Producer.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ export default class Producer extends EventEmitter {
4242
public async produce(key: string, message: ProducerMessageInterface): Promise<void> {
4343
try {
4444
// With version = 1
45-
message.path = "/missing"; // TODO: make this set-able via transform callback from config
45+
if (this.config.getPath) {
46+
message.path = this.config.getPath(message);
47+
}
48+
4649
await this.producer.buffer(this.config.produceTo, key, message, null);
4750

4851
super.emit("info", `Message produced with id ${key}`);

lib/interfaces/ConfigInterface.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import ConsumerContentInterface from "./ConsumerContentInterface";
22
import LoggerInterface from "./LoggerInterface";
3+
import ProducerMessageInterface from "./ProducerMessageInterface";
34

45
export default interface ConfigInterface {
56
consumeFrom: string;
@@ -44,6 +45,7 @@ export default interface ConfigInterface {
4445
noBatchCommits?: boolean;
4546
};
4647
transformer: (message: ConsumerContentInterface) => Promise<string>;
48+
getPath?: (message: ProducerMessageInterface) => string;
4749
"batch.num.messages"?: number;
4850
"compression.codec"?: "snappy";
4951
"dr_cb"?: boolean;
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export default interface ProducerMessageInterface {
22
content: string;
33
path?: string;
4+
url: string;
45
}

0 commit comments

Comments
 (0)