-
Notifications
You must be signed in to change notification settings - Fork 0
Vanish requests #29
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Vanish requests #29
Changes from 2 commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,12 +11,12 @@ export default class NameRecordRepository { | |
const luaScript = ` | ||
local pubkey = redis.call('GET', 'pubkey:' .. KEYS[1]) | ||
if not pubkey then return nil end | ||
|
||
local relays = redis.call('SMEMBERS', 'relays:' .. pubkey) | ||
local userAgent = redis.call('GET', 'user_agent:' .. pubkey) | ||
local clientIp = redis.call('GET', 'ip:' .. pubkey) | ||
local updatedAt = redis.call('GET', 'updated_at:' .. pubkey) | ||
|
||
return {pubkey, relays, userAgent, clientIp, updatedAt} | ||
`; | ||
|
||
|
@@ -87,6 +87,72 @@ export default class NameRecordRepository { | |
return true; | ||
} | ||
|
||
async deleteByPubkey(pubkey) { | ||
const namesToDelete = []; | ||
|
||
// Use SCAN, avoid KEYS | ||
const stream = this.redis.scanStream({ | ||
match: "pubkey:*", | ||
count: 1000, | ||
}); | ||
|
||
let processingPromises = []; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this unused? |
||
|
||
return new Promise((resolve, reject) => { | ||
stream.on("data", (resultKeys) => { | ||
stream.pause(); | ||
|
||
const pipeline = this.redis.pipeline(); | ||
|
||
resultKeys.forEach((key) => { | ||
pipeline.get(key); | ||
}); | ||
|
||
pipeline | ||
.exec() | ||
.then((results) => { | ||
const processing = []; | ||
|
||
for (let i = 0; i < resultKeys.length; i++) { | ||
const key = resultKeys[i]; | ||
const [err, associatedPubkey] = results[i]; | ||
|
||
if (err) { | ||
console.error(`Error getting value for key ${key}:`, err); | ||
continue; | ||
} | ||
|
||
if (associatedPubkey === pubkey) { | ||
const name = key.split(":")[1]; | ||
namesToDelete.push(name); | ||
} | ||
} | ||
|
||
stream.resume(); | ||
}) | ||
.catch((err) => { | ||
stream.destroy(); | ||
reject(err); | ||
}); | ||
}); | ||
|
||
stream.on("end", async () => { | ||
try { | ||
for (const name of namesToDelete) { | ||
await this.deleteByName(name); | ||
} | ||
resolve(true); | ||
} catch (err) { | ||
reject(err); | ||
} | ||
}); | ||
|
||
stream.on("error", (err) => { | ||
reject(err); | ||
}); | ||
}); | ||
} | ||
|
||
async fetchAndClearPendingNotifications() { | ||
const luaScript = ` | ||
local entries = redis.call('ZRANGE', 'pending_notifications', 0, -1) | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,45 @@ | ||
import app from "./app.js"; | ||
import logger from "./logger.js"; | ||
import config from "../config/index.js"; | ||
import { getRemoteRedisClient, getRedisClient } from "./getRedisClient.js"; | ||
import VanishSubscriber from "./vanishSubscriber.js"; // Import the VanishSubscriber class | ||
|
||
app.listen(config.port, () => { | ||
const vanishRequestsRedisClient = await getRemoteRedisClient(); | ||
const nip05RedisClient = await getRedisClient(); | ||
|
||
const server = app.listen(config.port, () => { | ||
logger.info(`Server is running on port ${config.port}`); | ||
}); | ||
|
||
process.on("uncaughtException", (err) => { | ||
logger.fatal(err, "Uncaught exception detected"); | ||
const vanishSubscriber = new VanishSubscriber( | ||
vanishRequestsRedisClient, | ||
nip05RedisClient | ||
); | ||
vanishSubscriber.run(); | ||
|
||
async function gracefulShutdown() { | ||
logger.info("Graceful shutdown initiated..."); | ||
|
||
vanishSubscriber.stop(); | ||
|
||
while (vanishSubscriber.isRunning) { | ||
await new Promise((resolve) => setTimeout(resolve, 100)); | ||
} | ||
|
||
server.close(() => { | ||
process.exit(1); | ||
logger.info("Express server closed."); | ||
process.exit(0); | ||
}); | ||
} | ||
|
||
setTimeout(() => { | ||
process.abort(); | ||
}, 1000).unref(); | ||
process.exit(1); | ||
process.on("uncaughtException", (err) => { | ||
logger.fatal(err, "Uncaught exception detected"); | ||
gracefulShutdown(); | ||
}); | ||
|
||
process.on("unhandledRejection", (reason, promise) => { | ||
logger.error(reason, "An unhandled promise rejection was detected"); | ||
}); | ||
|
||
process.on("SIGINT", gracefulShutdown); | ||
process.on("SIGTERM", gracefulShutdown); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
import NameRecordRepository from "./nameRecordRepository.js"; | ||
|
||
const VANISH_STREAM_KEY = "vanish_requests"; | ||
const LAST_PROCESSED_ID_KEY = "vanish_requests:nip05_service:last_id"; | ||
const BLOCK_TIME_MS = 5000; // 5 seconds | ||
|
||
class VanishSubscriber { | ||
constructor(vanishRequestsRedis, nip05Redis) { | ||
// Right now we have a local redis instance for nip05 data and a remote one | ||
// used by all our services. For the momen, the remote one is only used for | ||
// the vanish stream. | ||
// TODO: Refactor to migrate and use only one redis instance. | ||
|
||
const nameRecordRepository = new NameRecordRepository(nip05Redis); | ||
|
||
this.vanishRequestsRedis = vanishRequestsRedis; | ||
this.nameRecordRepository = nameRecordRepository; | ||
this.abortController = new AbortController(); | ||
this.isRunning = false; | ||
} | ||
|
||
async processPubkey(pubkey) { | ||
console.log(`Deleting pubkey: ${pubkey}`); | ||
await this.nameRecordRepository.deleteByPubkey(pubkey); | ||
} | ||
|
||
async run() { | ||
if (this.isRunning) return; // Prevent multiple runs | ||
this.isRunning = true; | ||
|
||
let lastProcessedID; | ||
|
||
try { | ||
lastProcessedID = | ||
(await this.vanishRequestsRedis.get(LAST_PROCESSED_ID_KEY)) || "0-0"; | ||
console.log(`Starting from last processed ID: ${lastProcessedID}`); | ||
} catch (err) { | ||
console.error("Error fetching last processed ID from Redis", err); | ||
this.isRunning = false; | ||
return; | ||
} | ||
|
||
const abortSignal = this.abortController.signal; | ||
|
||
while (!abortSignal.aborted) { | ||
try { | ||
const streamEntries = await this.vanishRequestsRedis.xread( | ||
"BLOCK", | ||
BLOCK_TIME_MS, | ||
"STREAMS", | ||
VANISH_STREAM_KEY, | ||
lastProcessedID | ||
); | ||
|
||
if (!streamEntries) { | ||
continue; | ||
} | ||
|
||
for (const [stream, messages] of streamEntries) { | ||
for (const [messageID, messageData] of messages) { | ||
const event = createObjectFromPairs(messageData); | ||
|
||
console.log(`Vanish requests event: ${JSON.stringify(event)} `); | ||
const pubkey = event.pubkey; | ||
|
||
console.log( | ||
`Processing message ID: ${messageID} with pubkey: ${pubkey}` | ||
); | ||
|
||
try { | ||
await this.processPubkey(pubkey); | ||
} catch (err) { | ||
console.error(`Error processing pubkey: ${pubkey}`, err); | ||
} | ||
|
||
try { | ||
await this.vanishRequestsRedis.set( | ||
LAST_PROCESSED_ID_KEY, | ||
messageID | ||
); | ||
lastProcessedID = messageID; | ||
console.log(`Updated last processed ID to: ${lastProcessedID}`); | ||
} catch (err) { | ||
console.error( | ||
`Error updating last processed ID: ${messageID}`, | ||
err | ||
); | ||
} | ||
} | ||
} | ||
} catch (err) { | ||
if (abortSignal.aborted) { | ||
break; | ||
} | ||
console.error("Error reading from Redis stream", err); | ||
await new Promise((resolve) => setTimeout(resolve, 1000)); | ||
} | ||
} | ||
|
||
console.log("Cancellation signal received. Exiting gracefully..."); | ||
await this.vanishRequestsRedis.set(LAST_PROCESSED_ID_KEY, lastProcessedID); | ||
console.log(`Final last processed ID saved: ${lastProcessedID}`); | ||
|
||
this.isRunning = false; | ||
} | ||
|
||
stop() { | ||
if (!this.isRunning) return; | ||
this.abortController.abort(); | ||
console.log( | ||
"Abort signal sent. Waiting for current processing to finish..." | ||
); | ||
} | ||
} | ||
|
||
function createObjectFromPairs(messageData) { | ||
return messageData.reduce((acc, value, index, arr) => { | ||
if (index % 2 === 0) { | ||
acc[value] = arr[index + 1]; | ||
} | ||
return acc; | ||
}, {}); | ||
} | ||
|
||
export default VanishSubscriber; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find "local" and "remote" redis a little confusing. Is the local redis for storing name/pubkey pairs, and remote is for the vanish requests? I think this could use some more documentation at least. Maybe even a rename to "nameRedis" and "vanishRequestRedis" or something like that would be good.
And ideally someday we can refactor down to one Redis instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yeah, down in
src/server.js
they have nicer names.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed them to getNip05RedisClient and getVanishRequestsRedisClient. We can always get back to it if we ever add more use cases and we don't migrate to the DO db.