Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,56 +69,53 @@ export class Handshaker {

private async selectParallelTargetsAndHandshake(excludedIds: DhtAddress[]): Promise<DhtAddress[]> {
const exclude = excludedIds.concat(this.options.neighbors.getIds())
const neighbors = this.selectParallelTargets(exclude)
neighbors.forEach((contact) => this.options.ongoingHandshakes.add(toNodeId(contact.getPeerDescriptor())))
return this.doParallelHandshakes(neighbors, exclude)
const targets = this.selectParallelTargets(exclude)
targets.forEach((contact) => this.options.ongoingHandshakes.add(toNodeId(contact.getPeerDescriptor())))
return this.doParallelHandshakes(targets, exclude)
}

private selectParallelTargets(excludedIds: DhtAddress[]): HandshakeRpcRemote[] {
const neighbors: Map<DhtAddress, ContentDeliveryRpcRemote> = new Map()
// If the node has 0 neighbors find a node in the stream with a WS server to connect to for faster time to data.
const targets = new Map<DhtAddress, ContentDeliveryRpcRemote>()
const getExcludedIds = () => [...excludedIds, ...Array.from(targets.keys())]

// Step 1: If no neighbors, try to find a WebSocket node first
if (this.options.neighbors.size() === 0) {
const wsNode = this.options.nearbyNodeView.getFirst(
[...excludedIds, ...Array.from(neighbors.keys())] as DhtAddress[],
true
)
const wsNode = this.options.nearbyNodeView.getFirst(getExcludedIds(), true)
if (wsNode) {
const wsNodeId = toNodeId(wsNode.getPeerDescriptor())
excludedIds.push(wsNodeId)
neighbors.set(wsNodeId, wsNode)
}
targets.set(wsNodeId, wsNode)
}
}
// Add the closest left and then right contacts from the ring if possible.
const left = this.options.leftNodeView.getFirst([...excludedIds, ...Array.from(neighbors.keys())] as DhtAddress[])
const right = this.options.rightNodeView.getFirst([...excludedIds, ...Array.from(neighbors.keys())] as DhtAddress[])

// Step 2: Add left and right contacts from the ring
const left = this.options.leftNodeView.getFirst(getExcludedIds())
const right = this.options.rightNodeView.getFirst(getExcludedIds())
if (left) {
neighbors.set(toNodeId(left.getPeerDescriptor()), left)
targets.set(toNodeId(left.getPeerDescriptor()), left)
}
if (right) {
neighbors.set(toNodeId(right.getPeerDescriptor()), right)
targets.set(toNodeId(right.getPeerDescriptor()), right)
}
// If there is still room add the closest contact based on the kademlia metric
if (neighbors.size < PARALLEL_HANDSHAKE_COUNT) {
const first = this.options.nearbyNodeView.getFirst([...excludedIds, ...Array.from(neighbors.keys())] as DhtAddress[])
if (first) {
neighbors.set(toNodeId(first.getPeerDescriptor()), first)
// Step 3: Add closest contact based on Kademlia metric if needed
if (targets.size < PARALLEL_HANDSHAKE_COUNT) {
const closest = this.options.nearbyNodeView.getFirst(getExcludedIds())
if (closest) {
targets.set(toNodeId(closest.getPeerDescriptor()), closest)
}
}
const getExcludedFromRandomView = () => [
...excludedIds,
...Array.from(neighbors.values()).map((neighbor) => toNodeId(neighbor.getPeerDescriptor()))
]
// If there is still room add a random contact until PARALLEL_HANDSHAKE_COUNT is reached
while (
neighbors.size < PARALLEL_HANDSHAKE_COUNT
&& this.options.randomNodeView.size(getExcludedFromRandomView()) > 0
) {
const random = this.options.randomNodeView.getRandom([...excludedIds, ...Array.from(neighbors.keys())] as DhtAddress[])
if (random) {
neighbors.set(toNodeId(random.getPeerDescriptor()), random)

// Step 4: Fill remaining slots with random contacts
while (targets.size < PARALLEL_HANDSHAKE_COUNT) {
const random = this.options.randomNodeView.getRandom(getExcludedIds())
if (!random) {
break
}
targets.set(toNodeId(random.getPeerDescriptor()), random)
}
return Array.from(neighbors.values()).map((neighbor) => this.createRpcRemote(neighbor.getPeerDescriptor()))

return Array.from(targets.values()).map((neighbor) =>
this.createRpcRemote(neighbor.getPeerDescriptor())
)
}

private async doParallelHandshakes(targets: HandshakeRpcRemote[], excludedIds: DhtAddress[]): Promise<DhtAddress[]> {
Expand All @@ -140,29 +137,29 @@ export class Handshaker {

private async selectNewTargetAndHandshake(excludedIds: DhtAddress[]): Promise<DhtAddress[]> {
const exclude = excludedIds.concat(this.options.neighbors.getIds())
const neighbor = this.options.leftNodeView.getFirst(exclude)
const target = this.options.leftNodeView.getFirst(exclude)
?? this.options.rightNodeView.getFirst(exclude)
?? this.options.nearbyNodeView.getFirst(exclude)
?? this.options.randomNodeView.getRandom(exclude)
if (neighbor) {
const accepted = await this.handshakeWithTarget(this.createRpcRemote(neighbor.getPeerDescriptor()))
if (target) {
const accepted = await this.handshakeWithTarget(this.createRpcRemote(target.getPeerDescriptor()))
if (!accepted) {
excludedIds.push(toNodeId(neighbor.getPeerDescriptor()))
excludedIds.push(toNodeId(target.getPeerDescriptor()))
}
}
return excludedIds
}

private async handshakeWithTarget(neighbor: HandshakeRpcRemote, concurrentNodeId?: DhtAddress): Promise<boolean> {
const targetNodeId = toNodeId(neighbor.getPeerDescriptor())
private async handshakeWithTarget(target: HandshakeRpcRemote, concurrentNodeId?: DhtAddress): Promise<boolean> {
const targetNodeId = toNodeId(target.getPeerDescriptor())
this.options.ongoingHandshakes.add(targetNodeId)
const result = await neighbor.handshake(
const result = await target.handshake(
this.options.streamPartId,
this.options.neighbors.getIds(),
concurrentNodeId
)
if (result.accepted) {
this.options.neighbors.add(this.createContentDeliveryRpcRemote(neighbor.getPeerDescriptor()))
this.options.neighbors.add(this.createContentDeliveryRpcRemote(target.getPeerDescriptor()))
}
if (result.interleaveTargetDescriptor) {
await this.handshakeWithInterleaving(result.interleaveTargetDescriptor, targetNodeId)
Expand All @@ -172,17 +169,17 @@ export class Handshaker {
}

private async handshakeWithInterleaving(target: PeerDescriptor, remoteNodeId: DhtAddress): Promise<boolean> {
const neighbor = this.createRpcRemote(target)
const targetNodeId = toNodeId(neighbor.getPeerDescriptor())
const remote = this.createRpcRemote(target)
const targetNodeId = toNodeId(remote.getPeerDescriptor())
this.options.ongoingHandshakes.add(targetNodeId)
const result = await neighbor.handshake(
const result = await remote.handshake(
this.options.streamPartId,
this.options.neighbors.getIds(),
undefined,
remoteNodeId
)
if (result.accepted) {
this.options.neighbors.add(this.createContentDeliveryRpcRemote(neighbor.getPeerDescriptor()))
this.options.neighbors.add(this.createContentDeliveryRpcRemote(remote.getPeerDescriptor()))
}
this.options.ongoingHandshakes.delete(targetNodeId)
return result.accepted
Expand Down
Loading