Skip to content

Commit

Permalink
Vanish requests
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Oct 14, 2024
1 parent 4f53fe6 commit c183eaf
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 29 deletions.
1 change: 1 addition & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export default {
slackCron: process.env.SLACK_CRON || "*/10 * * * *",
redis: {
host: process.env.REDIS_HOST || "localhost",
remote_host: process.env.REDIS_REMOTE_HOST || "redis://redis:6379",
},
logLevel: "info",
rootDomain: process.env.ROOT_DOMAIN || "nos.social",
Expand Down
4 changes: 1 addition & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
---
version: "3.8"

services:
server:
build: .
Expand All @@ -10,6 +7,7 @@ services:
- NODE_ENV=development
- REDIS_HOST=redis
- ROOT_DOMAIN=localhost

redis:
image: redis:7.2.4
ports:
Expand Down
16 changes: 9 additions & 7 deletions scripts/add_name
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ usage() {
echo " NPUB - The public key."
echo " RELAY_URLS - One or more relay URLs, each as a separate argument."
echo " Note: This script requires the 'pubhex' secret to be set in the NIP05_SEC environment variable."
echo " The base URL can be changed by setting the BASE_URL environment variable. Default is 'https://nos.social'."
echo "Dependencies:"
echo " nostrkeytool - A tool for NOSTR keys, installable via 'cargo install nostrkeytool' (https://crates.io/crates/nostrkeytool)."
echo " nak - A tool required for authentication, installable via 'go install github.com/fiatjaf/nak@latest' (https://github.com/fiatjaf/nak)."
Expand All @@ -23,26 +24,27 @@ fi
NAME="$1"
NPUB="$2"
RELAYS="${@:3}"

BASE_URL="${BASE_URL:-https://nos.social}"
RELAYS_JSON_ARRAY=$(printf "%s\n" $RELAYS | jq -R . | jq -s .)
BASE64_DELETE_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='DELETE' -t u="https://nos.social/api/names/$NAME" --sec $NIP05_SEC | base64)
BASE64_DELETE_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='DELETE' -t u="$BASE_URL/api/names/$NAME" --sec "$NIP05_SEC" | base64)

HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "https://nos.social/api/names/$NAME" \
HTTP_STATUS=$(curl -s -o /dev/null -w "%{http_code}" -X DELETE "$BASE_URL/api/names/$NAME" \
-H "Content-Type: application/json" \
-H "Authorization: Nostr $BASE64_DELETE_AUTH_EVENT")

echo "HTTP Status from delete: $HTTP_STATUS"

PUBKEY=$(nostrkeytool --npub2pubkey $NPUB)
PUBKEY=$(nostrkeytool --npub2pubkey "$NPUB")

JSON_PAYLOAD=$(jq -n \
--arg name "$NAME" \
--arg pubkey "$PUBKEY" \
--argjson relays "$RELAYS_JSON_ARRAY" \
'{name: $name, data: {pubkey: $pubkey, relays: $relays}}')

BASE64_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='POST' -t u='https://nos.social/api/names' --sec $NIP05_SEC | base64)
curl -s https://nos.social/api/names \
BASE64_AUTH_EVENT=$(nak event --content='' --kind 27235 -t method='POST' -t u="$BASE_URL/api/names" --sec "$NIP05_SEC" | base64)

curl -s "$BASE_URL/api/names" \
-H "Content-Type: application/json" \
-H "Authorization: Nostr $BASE64_AUTH_EVENT" \
-d "$JSON_PAYLOAD"
-d "$JSON_PAYLOAD"
2 changes: 1 addition & 1 deletion src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import pinoHTTP from "pino-http";
import promClient from "prom-client";
import promBundle from "express-prom-bundle";
import cors from "cors";
import getRedisClient from "./getRedisClient.js";
import { getRedisClient } from "./getRedisClient.js";
import routes from "./routes.js";
import logger from "./logger.js";
import NameRecordRepository from "./nameRecordRepository.js";
Expand Down
32 changes: 27 additions & 5 deletions src/getRedisClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import config from "../config/index.js";
import logger from "./logger.js";

// istanbul ignore next
const redisImportPromise = process.env.NODE_ENV === "test"
? import("ioredis-mock")
: import("ioredis");
const redisImportPromise =
process.env.NODE_ENV === "test" ? import("ioredis-mock") : import("ioredis");

let redisClient;
let remoteRedisClient;

async function initializeRedis() {
try {
Expand All @@ -25,11 +25,33 @@ async function initializeRedis() {
}
}

async function getRedisClient() {
async function initializeRemoteRedis() {
try {
const Redis = (await redisImportPromise).default;
remoteRedisClient = new Redis(config.redis.remote_host);

remoteRedisClient.on("connect", () =>
logger.info("Connected to Remote Redis")
);
remoteRedisClient.on("error", (err) =>
logger.error(err, "Remote Redis error")
);
} catch (error) {
// istanbul ignore next
logger.error(error, "Error initializing Remote Redis client");
}
}

export async function getRedisClient() {
if (!redisClient) {
await initializeRedis();
}
return redisClient;
}

export default getRedisClient;
export async function getRemoteRedisClient() {
if (!remoteRedisClient) {
await initializeRemoteRedis();
}
return remoteRedisClient;
}
70 changes: 68 additions & 2 deletions src/nameRecordRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ export default class NameRecordRepository {
const luaScript = `
local pubkey = redis.call('GET', 'pubkey:' .. KEYS[1])
if not pubkey then return nil end
local relays = redis.call('SMEMBERS', 'relays:' .. pubkey)
local userAgent = redis.call('GET', 'user_agent:' .. pubkey)
local clientIp = redis.call('GET', 'ip:' .. pubkey)
local updatedAt = redis.call('GET', 'updated_at:' .. pubkey)
return {pubkey, relays, userAgent, clientIp, updatedAt}
`;

Expand Down Expand Up @@ -87,6 +87,72 @@ export default class NameRecordRepository {
return true;
}

async deleteByPubkey(pubkey) {
const namesToDelete = [];

// Use SCAN, avoid KEYS
const stream = this.redis.scanStream({
match: "pubkey:*",
count: 1000,
});

let processingPromises = [];

return new Promise((resolve, reject) => {
stream.on("data", (resultKeys) => {
stream.pause();

const pipeline = this.redis.pipeline();

resultKeys.forEach((key) => {
pipeline.get(key);
});

pipeline
.exec()
.then((results) => {
const processing = [];

for (let i = 0; i < resultKeys.length; i++) {
const key = resultKeys[i];
const [err, associatedPubkey] = results[i];

if (err) {
console.error(`Error getting value for key ${key}:`, err);
continue;
}

if (associatedPubkey === pubkey) {
const name = key.split(":")[1];
namesToDelete.push(name);
}
}

stream.resume();
})
.catch((err) => {
stream.destroy();
reject(err);
});
});

stream.on("end", async () => {
try {
for (const name of namesToDelete) {
await this.deleteByName(name);
}
resolve(true);
} catch (err) {
reject(err);
}
});

stream.on("error", (err) => {
reject(err);
});
});
}

async fetchAndClearPendingNotifications() {
const luaScript = `
local entries = redis.call('ZRANGE', 'pending_notifications', 0, -1)
Expand Down
38 changes: 30 additions & 8 deletions src/server.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,45 @@
import app from "./app.js";
import logger from "./logger.js";
import config from "../config/index.js";
import { getRemoteRedisClient, getRedisClient } from "./getRedisClient.js";
import VanishSubscriber from "./vanishSubscriber.js"; // Import the VanishSubscriber class

app.listen(config.port, () => {
const vanishRequestsRedisClient = await getRemoteRedisClient();
const nip05RedisClient = await getRedisClient();

const server = app.listen(config.port, () => {
logger.info(`Server is running on port ${config.port}`);
});

process.on("uncaughtException", (err) => {
logger.fatal(err, "Uncaught exception detected");
const vanishSubscriber = new VanishSubscriber(
vanishRequestsRedisClient,
nip05RedisClient
);
vanishSubscriber.run();

async function gracefulShutdown() {
logger.info("Graceful shutdown initiated...");

vanishSubscriber.stop();

while (vanishSubscriber.isRunning) {
await new Promise((resolve) => setTimeout(resolve, 100));
}

server.close(() => {
process.exit(1);
logger.info("Express server closed.");
process.exit(0);
});
}

setTimeout(() => {
process.abort();
}, 1000).unref();
process.exit(1);
process.on("uncaughtException", (err) => {
logger.fatal(err, "Uncaught exception detected");
gracefulShutdown();
});

process.on("unhandledRejection", (reason, promise) => {
logger.error(reason, "An unhandled promise rejection was detected");
});

process.on("SIGINT", gracefulShutdown);
process.on("SIGTERM", gracefulShutdown);
125 changes: 125 additions & 0 deletions src/vanishSubscriber.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import NameRecordRepository from "./nameRecordRepository.js";

const VANISH_STREAM_KEY = "vanish_requests";
const LAST_PROCESSED_ID_KEY = "vanish_requests:nip05_service:last_id";
const BLOCK_TIME_MS = 5000; // 5 seconds

class VanishSubscriber {
constructor(vanishRequestsRedis, nip05Redis) {
// Right now we have a local redis instance for nip05 data and a remote one
// used by all our services. For the momen, the remote one is only used for
// the vanish stream.
// TODO: Refactor to migrate and use only one redis instance.

const nameRecordRepository = new NameRecordRepository(nip05Redis);

this.vanishRequestsRedis = vanishRequestsRedis;
this.nameRecordRepository = nameRecordRepository;
this.abortController = new AbortController();
this.isRunning = false;
}

async processPubkey(pubkey) {
console.log(`Deleting pubkey: ${pubkey}`);
await this.nameRecordRepository.deleteByPubkey(pubkey);
}

async run() {
if (this.isRunning) return; // Prevent multiple runs
this.isRunning = true;

let lastProcessedID;

try {
lastProcessedID =
(await this.vanishRequestsRedis.get(LAST_PROCESSED_ID_KEY)) || "0-0";
console.log(`Starting from last processed ID: ${lastProcessedID}`);
} catch (err) {
console.error("Error fetching last processed ID from Redis", err);
this.isRunning = false;
return;
}

const abortSignal = this.abortController.signal;

while (!abortSignal.aborted) {
try {
const streamEntries = await this.vanishRequestsRedis.xread(
"BLOCK",
BLOCK_TIME_MS,
"STREAMS",
VANISH_STREAM_KEY,
lastProcessedID
);

if (!streamEntries) {
continue;
}

for (const [stream, messages] of streamEntries) {
for (const [messageID, messageData] of messages) {
const event = createObjectFromPairs(messageData);

console.log(`Vanish requests event: ${JSON.stringify(event)} `);
const pubkey = event.pubkey;

console.log(
`Processing message ID: ${messageID} with pubkey: ${pubkey}`
);

try {
await this.processPubkey(pubkey);
} catch (err) {
console.error(`Error processing pubkey: ${pubkey}`, err);
}

try {
await this.vanishRequestsRedis.set(
LAST_PROCESSED_ID_KEY,
messageID
);
lastProcessedID = messageID;
console.log(`Updated last processed ID to: ${lastProcessedID}`);
} catch (err) {
console.error(
`Error updating last processed ID: ${messageID}`,
err
);
}
}
}
} catch (err) {
if (abortSignal.aborted) {
break;
}
console.error("Error reading from Redis stream", err);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}

console.log("Cancellation signal received. Exiting gracefully...");
await this.vanishRequestsRedis.set(LAST_PROCESSED_ID_KEY, lastProcessedID);
console.log(`Final last processed ID saved: ${lastProcessedID}`);

this.isRunning = false;
}

stop() {
if (!this.isRunning) return;
this.abortController.abort();
console.log(
"Abort signal sent. Waiting for current processing to finish..."
);
}
}

function createObjectFromPairs(messageData) {
return messageData.reduce((acc, value, index, arr) => {
if (index % 2 === 0) {
acc[value] = arr[index + 1];
}
return acc;
}, {});
}

export default VanishSubscriber;
Loading

0 comments on commit c183eaf

Please sign in to comment.