Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add nodejs peer #242

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nodejs-peer/jest.config.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
preset: 'jest-puppeteer',
testTimeout: 60000, // Increase timeout for WebRTC handshakes
};

11,624 changes: 11,624 additions & 0 deletions nodejs-peer/package-lock.json

Large diffs are not rendered by default.

53 changes: 53 additions & 0 deletions nodejs-peer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
{
"name": "universal-connectivity-nodejs-peer",
"type": "module",
"main": "src/libp2p.js",
"scripts": {
"start": "node src/libp2p.js",
"test": "node --experimental-vm-modules node_modules/jest/bin/jest.js",
"test:webrtc": "node --experimental-vm-modules node_modules/jest/bin/jest.js test/webrtc.test.js"
},
"dependencies": {
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@chainsafe/libp2p-noise": "^16.0.1",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@helia/delegated-routing-v1-http-api-client": "^4.2.2",
"@libp2p/autonat": "^2.0.20",
"@libp2p/bootstrap": "^11.0.23",
"@libp2p/circuit-relay-v2": "^3.1.13",
"@libp2p/dcutr": "^2.0.19",
"@libp2p/identify": "^3.0.19",
"@libp2p/interface": "^2.5.0",
"@libp2p/kad-dht": "^14.2.8",
"@libp2p/mdns": "^11.0.28",
"@libp2p/mplex": "^11.0.28",
"@libp2p/peer-id": "^5.0.12",
"@libp2p/peer-id-factory": "^4.2.4",
"@libp2p/ping": "^2.0.23",
"@libp2p/pubsub-peer-discovery": "^11.0.1",
"@libp2p/tcp": "^10.0.20",
"@libp2p/webrtc": "^5.1.1",
"@libp2p/webrtc-direct": "^6.0.0",
"@libp2p/websockets": "^9.1.5",
"@libp2p/webtransport": "^5.0.33",
"@multiformats/multiaddr": "^12.4.0",
"@types/node": "^22.13.4",
"express": "^4.21.2",
"it-all": "^3.0.6",
"it-length-prefixed": "^10.0.1",
"it-map": "^3.1.1",
"it-pipe": "^3.0.1",
"libp2p": "^2.6.3",
"multiaddr": "^10.0.1",
"multiformats": "^13.3.2",
"ts-node": "^10.9.2",
"typescript": "^5.7.3",
"uint8arrays": "^5.1.0",
"ws": "^8.18.1"
},
"devDependencies": {
"jest": "^29.7.0",
"jest-puppeteer": "^11.0.0",
"puppeteer": "^24.3.0"
}
}
15 changes: 15 additions & 0 deletions nodejs-peer/src/constants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export const CHAT_TOPIC = 'universal-connectivity'
export const CHAT_FILE_TOPIC = 'universal-connectivity-nodejs-file'
export const PUBSUB_PEER_DISCOVERY = 'universal-connectivity-nodejs-peer-discovery'
export const FILE_EXCHANGE_PROTOCOL = '/universal-connectivity-nodejs-file/1'
export const DIRECT_MESSAGE_PROTOCOL = '/universal-connectivity-nodejs/dm/1.0.0'

export const CIRCUIT_RELAY_CODE = 291

export const MIME_TEXT_PLAIN = 'text/plain'

// 👇 App specific dedicated bootstrap PeerIDs
// Their multiaddrs are ephemeral so peer routing is used to resolve multiaddr
export const WEBTRANSPORT_BOOTSTRAP_PEER_ID = '12D3KooWH7MdJvo6L1ZvBmr9mgg5fZPCzQNG7UKKduxRqwiNDX6E'

export const BOOTSTRAP_PEER_IDS = [WEBTRANSPORT_BOOTSTRAP_PEER_ID]
68 changes: 68 additions & 0 deletions nodejs-peer/src/dialer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import { createLibp2p } from "libp2p";
import { noise } from "@chainsafe/libp2p-noise";
import { yamux } from "@chainsafe/libp2p-yamux";
import { tcp } from "@libp2p/tcp";
import { webSockets } from "@libp2p/websockets";
import { webRTC, webRTCDirect } from "@libp2p/webrtc";
import { circuitRelayTransport } from "@libp2p/circuit-relay-v2";
import { identify } from "@libp2p/identify";
import { multiaddr } from "@multiformats/multiaddr";
import { pubsubPeerDiscovery } from "@libp2p/pubsub-peer-discovery";
import { ping } from "@libp2p/ping";
import { gossipsub } from "@chainsafe/libp2p-gossipsub";
import { PUBSUB_PEER_DISCOVERY } from "./constants.js";

const dialPeer = async (peerMultiaddr) => {
const dialer = await createLibp2p({
addresses: {
listen: [
"/ip4/0.0.0.0/tcp/0",
"/ip4/0.0.0.0/tcp/0/ws",
"/webrtc",
"/webrtc-direct",
],
},
transports: [
tcp(),
webSockets(),
webRTC(),
webRTCDirect(),
circuitRelayTransport({ discoverRelays: 1 }),
],
connectionEncrypters: [noise()],
streamMuxers: [yamux()],
services: {
identify: identify(),
ping: ping(),
pubsub: gossipsub(),
},
peerDiscovery: [
pubsubPeerDiscovery({
interval: 60000,
topics: [PUBSUB_PEER_DISCOVERY],
listenOnly: false,
}),
],
});

console.log("Dialer started, listening on:");
dialer.getMultiaddrs().forEach((addr) => console.log(addr.toString()));

try {
console.log(`🔄 Dialing peer: ${peerMultiaddr}`);
const conn = await dialer.dial(multiaddr(peerMultiaddr));
console.log(`✅ Successfully dialed ${conn.remotePeer.toString()}`);
} catch (err) {
console.error(`❌ Dialing failed: ${err.message}`);
}
};

const peerMultiaddr = process.argv[2];
if (!peerMultiaddr) {
console.error("❌ Please provide a peer multiaddr to dial.");
process.exit(1);
}

dialPeer(peerMultiaddr).catch((err) =>
console.log("❌ Unexpected error:", err.message)
);
191 changes: 191 additions & 0 deletions nodejs-peer/src/libp2p.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import { createLibp2p } from "libp2p";
import { webRTC } from "@libp2p/webrtc";
import { webSockets } from "@libp2p/websockets";
import { tcp } from "@libp2p/tcp";
import { noise } from "@chainsafe/libp2p-noise";
import { circuitRelayTransport } from "@libp2p/circuit-relay-v2";
import { identify, identifyPush } from "@libp2p/identify";
import { dcutr } from "@libp2p/dcutr";
import { autoNAT } from "@libp2p/autonat";
import { yamux } from "@chainsafe/libp2p-yamux";
import { pubsubPeerDiscovery } from "@libp2p/pubsub-peer-discovery";
import { gossipsub } from "@chainsafe/libp2p-gossipsub";
import { CHAT_FILE_TOPIC, CHAT_TOPIC, PUBSUB_PEER_DISCOVERY } from "./constants.js";
import { kadDHT } from "@libp2p/kad-dht";
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string';
import { toString as uint8ArrayToString } from 'uint8arrays/to-string';
import { sha256 } from 'multiformats/hashes/sha2';
import { stdinToStream, streamToConsole } from './stream.js';

// Define the universal connectivity protocol constant
const UNIVERSAL_PROTOCOL = '/universal/1.0.0';

export async function createNode() {
const node = await createLibp2p({
addresses: {
listen: [
"/ip4/0.0.0.0/tcp/0/ws",
"/ip4/0.0.0.0/tcp/0",
'/webrtc',
'/webrtc-direct',
],
},
transports: [
webSockets(),
tcp(),
webRTC(),
circuitRelayTransport({ discoverRelays: 1 })
],
connectionEncrypters: [noise()],
connectionManager: {
maxConnections: 100,
minConnections: 5,
autoDialInterval: 30000,
dialTimeout: 30000,
},
connectionGater: {
denyDialMultiaddr: async ({ multiaddr }) => false,
},
streamMuxers: [yamux()],
services: {
pubsub: gossipsub({
allowPublishToZeroTopicPeers: true,
msgIdFn: msgIdFnStrictNoSign,
ignoreDuplicatePublishError: true,
}),
identify: identify(),
identifyPush: identifyPush(),
dcutr: dcutr(),
kadDHT: kadDHT(),
autoNAT: autoNAT({
protocolPrefix: "libp2p",
startupDelay: 5000,
refreshInterval: 60000,
}),
},
peerDiscovery: [
pubsubPeerDiscovery({
interval: 30000,
topics: [PUBSUB_PEER_DISCOVERY],
listenOnly: false,
}),
],
});
return node;
}

export async function msgIdFnStrictNoSign(msg) {
const enc = new TextEncoder();
const encodedSeqNum = enc.encode(msg.sequenceNumber.toString());
return await sha256.encode(encodedSeqNum);
}

/**
* Helper to dial a target multiaddr using the specified protocol.
* Sets up interactive pipes for stdin and stdout.
*/
async function robustDial(sourceNode, targetMultiaddr, protocol = UNIVERSAL_PROTOCOL) {
try {
console.log(`Attempting to dial ${targetMultiaddr} using protocol ${protocol}`);
const stream = await sourceNode.dialProtocol(targetMultiaddr, protocol);
console.log(`Successfully dialed ${targetMultiaddr} with protocol ${protocol}`);
// Set up interactive communication
stdinToStream(stream);
streamToConsole(stream);
return stream;
} catch (error) {
console.error(`Failed to dial ${targetMultiaddr} using protocol ${protocol}: ${error.message}`);
throw error;
}
}

async function main() {
// Create two nodes concurrently for testing purposes.
const [node1, node2] = await Promise.all([createNode(), createNode()]);

console.log(`Node1 ID: ${node1.peerId.toString()}`);
node1.getMultiaddrs().forEach(addr => console.log(`Node1 listening on: ${addr.toString()}`));

console.log(`Node2 ID: ${node2.peerId.toString()}`);
node2.getMultiaddrs().forEach(addr => console.log(`Node2 listening on: ${addr.toString()}`));

// // Setup universal protocol handler on node2.
node2.handle(UNIVERSAL_PROTOCOL, async ({ stream, connection }) => {
console.log(`Node2 received connection on ${UNIVERSAL_PROTOCOL} from ${connection.remotePeer.toString()}`);
try {
// Establish interactive communication.
stdinToStream(stream);
streamToConsole(stream);
} catch (err) {
console.log('Error in universal protocol handler on node2:', err.message);
}
});

// Directly dial node2 from node1 using one of node2's multiaddrs.
const targetAddr = node2.getMultiaddrs()[0];
if (targetAddr) {
await robustDial(node1, targetAddr, UNIVERSAL_PROTOCOL);
} else {
console.warn('No multiaddr found for node2');
}

// Log new connections on node1.
node1.addEventListener('connection:open', (evt) => {
try {
const conn = evt.detail;
console.log(`Node1: New connection opened from peer ${conn.remotePeer}`);
console.log('Connection details:', conn);
} catch (err) {
console.log('Error in connection:open listener on node1:', err.message);
}
});

// When a peer is discovered, attempt to dial using the universal protocol.
node2.addEventListener('peer:discovery', async (evt) => {
console.info('Node1 discovered peer:', evt.detail);
const discoveredMultiaddrs = evt.detail.id;
if (discoveredMultiaddrs && discoveredMultiaddrs.length > 0) {
try {
await robustDial(node1, discoveredMultiaddrs, UNIVERSAL_PROTOCOL);
} catch (error) {
console.log('Error dialing discovered peer:', error.message);
}
}
});

// Setup pubsub subscriptions and logging.
node1.services.pubsub.subscribe(CHAT_TOPIC);
node1.services.pubsub.addEventListener('message', (evt) => {
console.log(`Node1 received on topic ${evt.detail.topic}: ${uint8ArrayToString(evt.detail.data)}`);
});

node2.services.pubsub.subscribe(CHAT_TOPIC);
node2.services.pubsub.addEventListener('message', (evt) => {
console.log(`Node2 received on topic ${evt.detail.topic}: ${uint8ArrayToString(evt.detail.data)}`);
});

// For testing: Node2 periodically publishes messages on several topics.
setInterval(() => {
node2.services.pubsub.publish(CHAT_TOPIC, uint8ArrayFromString('Hello Go & Rust!'))
.catch(err => console.log(`Error publishing to ${CHAT_TOPIC}:`, err.message));
}, 3000);

console.log('Nodes are running and ready for robust dialing and interactions.');
}

main().catch(err => {
console.log('Main execution error:', err.message);
process.exit(1);
});

export class NATManager {
constructor(node) {
node.addEventListener('self:nat:status', (evt) => {
console.log('NAT Status:', evt.detail)
if(evt.detail === 'UNSUPPORTED') {
console.log('Enabling circuit relay as fallback')
node.configure(circuitRelayTransport({ discoverRelays: 2 }))
}
})
}
}
35 changes: 35 additions & 0 deletions nodejs-peer/src/stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/* eslint-disable no-console */
import * as lp from 'it-length-prefixed'
import map from 'it-map'
import { pipe } from 'it-pipe'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'

// Helper: Reads data from stdin and writes to a stream with length-prefixed encoding.
export function stdinToStream(stream) {
process.stdin.setEncoding('utf8');
pipe(
process.stdin,
(source) => map(source, (string) => uint8ArrayFromString(string + '\n')),
(source) => lp.encode(source),
stream.sink
).catch(err => {
console.log('Error in stdinToStream:', err.message);
});
}

// Helper: Reads length-prefixed data from a stream and outputs it to the console.
export function streamToConsole(stream) {
pipe(
stream.source,
(source) => lp.decode(source),
(source) => map(source, (buf) => uint8ArrayToString(buf.subarray())),
async function (source) {
for await (const msg of source) {
console.log('> ' + msg.toString().trim());
}
}
).catch(err => {
console.log('Error in streamToConsole:', err.message);
});
}
Loading