Skip to content

Commit

Permalink
Merge pull request #27 from seriousme/improve-memory-backend
Browse files Browse the repository at this point in the history
Improve memory persistence backend
  • Loading branch information
seriousme authored Jan 31, 2025
2 parents 7917fc3 + 6a8e374 commit 79c92c0
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 34 deletions.
68 changes: 62 additions & 6 deletions deno/tcpServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import { TcpServer } from "./tcpServer.ts";
import { logger, LogLevel } from "../utils/mod.ts";
import type { PublishPacket, QoS } from "../mqttPacket/mod.ts";

logger.level(LogLevel.verbose);
logger.level(LogLevel.info);
export function sleep(ms: number): Promise<unknown> {
return new Promise((r) => setTimeout(r, ms));
}
export async function serverTest() {

test("Deno Test pubSub using client and server", async function () {
const server = new TcpServer({ port: 0 }, {});
server.start();

Expand Down Expand Up @@ -85,9 +86,64 @@ export async function serverTest() {

logger.verbose(`Stop server`);
server.stop();
}
});

test("Deno Test pubSub using client and server", async function () {
await serverTest();
logger.verbose("End of test");
test("Deno Test subscription persistence after reconnect", async function () {
// Start server
const server = new TcpServer({ port: 0 }, {});
server.start();

const params = {
url: new URL(`mqtt://${server.address}:${server.port}`),
numberOfRetries: 0,
};

const client = new TcpClient();
const testTopic = "test/topic";
const received: PublishPacket[] = [];

// First connection and subscription
await client.connect(params);
await client.subscribe({
subscriptions: [{
topicFilter: testTopic,
qos: 0,
}],
});

// Start receiving messages
(async function () {
for await (const item of client.messages()) {
received.push(item);
}
})();

// Disconnect client
await client.disconnect();
await sleep(100);

// Reconnect client
await client.connect(params);
await sleep(100);

// Publish test message
await client.publish({
topic: testTopic,
qos: 0,
payload: new Uint8Array([0x01]),
});

await sleep(100);
logger.verbose(`Disconnect client`);
await client.disconnect();

// Verify message was received
assert.equal(received.length, 1, "Should receive one message");
assert.equal(
received[0].topic,
testTopic,
"Should receive message on subscribed topic",
);

server.stop();
});
68 changes: 62 additions & 6 deletions node/tcpServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import { TcpServer } from "./tcpServer.ts";
import { logger, LogLevel } from "../utils/mod.ts";
import type { PublishPacket, QoS } from "../mqttPacket/mod.ts";

logger.level(LogLevel.verbose);
logger.level(LogLevel.info);
export function sleep(ms: number): Promise<unknown> {
return new Promise((r) => setTimeout(r, ms));
}
export async function serverTest() {

test("Test pubSub using client and server", async function () {
const server = new TcpServer({ port: 0 }, {});
server.start();

Expand Down Expand Up @@ -85,9 +86,64 @@ export async function serverTest() {

logger.verbose(`Stop server`);
server.stop();
}
});

test("Test pubSub using client and server", async function () {
await serverTest();
logger.verbose("End of test");
test("Test subscription persistence after reconnect", async function () {
// Start server
const server = new TcpServer({ port: 0 }, {});
server.start();

const params = {
url: new URL(`mqtt://${server.address}:${server.port}`),
numberOfRetries: 0,
};

const client = new TcpClient();
const testTopic = "test/topic";
const received: PublishPacket[] = [];

// First connection and subscription
await client.connect(params);
await client.subscribe({
subscriptions: [{
topicFilter: testTopic,
qos: 0,
}],
});

// Start receiving messages
(async function () {
for await (const item of client.messages()) {
received.push(item);
}
})();

// Disconnect client
await client.disconnect();
await sleep(100);

// Reconnect client
await client.connect(params);
await sleep(100);

// Publish test message
await client.publish({
topic: testTopic,
qos: 0,
payload: new Uint8Array([0x01]),
});

await sleep(100);
logger.verbose(`Disconnect client`);
await client.disconnect();

// Verify message was received
assert.equal(received.length, 1, "Should receive one message");
assert.equal(
received[0].topic,
testTopic,
"Should receive message on subscribed topic",
);

server.stop();
});
3 changes: 2 additions & 1 deletion persistence/memory/memoryPersistence.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ test("new should create new Persistence object", () => {
});

test(
"Registring a client should register the client and return a Store Object",
"Registering a client should register the client and return a Store Object",
() => {
const persistence = new Persistence();
const clientId = "myClient";
const client = persistence.registerClient(clientId, () => {}, false);
assert.deepStrictEqual(persistence.clientList.has(clientId), true);
assert.deepStrictEqual(typeof client, "object");
assert.deepStrictEqual(client instanceof Store, true);
assert.deepStrictEqual(client.existingSession, false);
},
);

Expand Down
1 change: 1 addition & 0 deletions persistence/memory/memoryPersistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type ClientSubscription = {
};

export class MemoryStore implements IStore {
existingSession: boolean = false;
clientId: ClientId;
private packetId: PacketId;
pendingIncoming: PacketStore;
Expand Down
1 change: 1 addition & 0 deletions persistence/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export type PacketStore = Map<PacketId, PublishPacket>;
export type SubscriptionStore = Map<Topic, QoS>;

export interface IStore {
existingSession: boolean;
clientId: ClientId;
pendingIncoming: PacketStore;
pendingOutgoing: PacketStore;
Expand Down
57 changes: 36 additions & 21 deletions server/handlers/handleConnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,29 +52,42 @@ function validateConnect(
* @param clientId - The client ID
*/
function processValidatedConnect(
returnCode: TAuthenticationResult,
packet: ConnectPacket,
ctx: Context,
clientId: string,
) {
if (packet.will) {
ctx.will = {
type: PacketType.publish,
qos: packet.will.qos,
retain: packet.will.retain,
topic: packet.will.topic,
payload: packet.will.payload,
};
}
): boolean {
if (returnCode === AuthenticationResult.ok) {
if (packet.will) {
ctx.will = {
type: PacketType.publish,
qos: packet.will.qos,
retain: packet.will.retain,
topic: packet.will.topic,
payload: packet.will.payload,
};
}

ctx.connect(clientId, packet.clean || false);
ctx.connect(clientId, packet.clean || false);

const keepAlive = packet.keepAlive || 0;
if (keepAlive > 0) {
logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`);
ctx.timer = new Timer(() => {
ctx.close();
}, Math.floor(keepAlive * 1500));
const keepAlive = packet.keepAlive || 0;
if (keepAlive > 0) {
logger.debug(`Setting keepalive to ${keepAlive * 1500} ms`);
ctx.timer = new Timer(() => {
ctx.close();
}, Math.floor(keepAlive * 1500));
}
// is this a new session?
// either because its the first time for the client
// or it specifically asked for a clean one
const previousSession = ctx.store?.existingSession;
// client now has a history
if (!previousSession && ctx.store) {
ctx.store.existingSession = true;
}
return previousSession || false;
}
return false;
}

/**
Expand All @@ -85,10 +98,12 @@ function processValidatedConnect(
export function handleConnect(ctx: Context, packet: ConnectPacket): void {
const clientId = packet.clientId || `Opifex-${crypto.randomUUID()}`;
const returnCode = validateConnect(ctx, packet);
if (returnCode === AuthenticationResult.ok) {
processValidatedConnect(packet, ctx, clientId);
}
const sessionPresent = false;
const sessionPresent = processValidatedConnect(
returnCode,
packet,
ctx,
clientId,
);
ctx.send({
type: PacketType.connack,
sessionPresent,
Expand Down

0 comments on commit 79c92c0

Please sign in to comment.