Skip to content

Commit

Permalink
fix: getRemoteNodeClosestNodes shouldn't throw connection errors
Browse files Browse the repository at this point in the history
`getRemoteNodeClosestNodes` was throwing an connection error in certain conditions. If it failed to connect to a node it should've just skipped that node.

#418
  • Loading branch information
tegefaulkes committed Sep 14, 2022
1 parent d2b5a3a commit 81209af
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 40 deletions.
79 changes: 39 additions & 40 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import * as nodesErrors from './errors';
import GRPCClientAgent from '../agent/GRPCClientAgent';
import * as validationUtils from '../validation/utils';
import * as networkUtils from '../network/utils';
import * as agentErrors from '../agent/errors';
import * as grpcErrors from '../grpc/errors';
import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb';
import { timerStart } from '../utils';

Expand Down Expand Up @@ -190,11 +188,7 @@ class NodeConnectionManager {
return [
async (e) => {
await release();
if (
e instanceof nodesErrors.ErrorNodeConnectionDestroyed ||
e instanceof grpcErrors.ErrorGRPC ||
e instanceof agentErrors.ErrorAgentClientDestroyed
) {
if (nodesUtils.isConnectionError(e)) {
// Error with connection, shutting connection down
await this.destroyConnection(targetNodeId);
}
Expand Down Expand Up @@ -467,9 +461,6 @@ class NodeConnectionManager {
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
// Get the closest alpha nodes to the target node (set as shortlist)
// FIXME? this is an array. Shouldn't it be a set?
// It's possible for this to grow faster than we can consume it,
// doubly so if we allow duplicates
const shortlist = await this.nodeGraph.getClosestNodes(
targetNodeId,
this.initialClosestNodes,
Expand All @@ -487,7 +478,7 @@ class NodeConnectionManager {
const contacted: { [nodeId: string]: boolean } = {};
// Iterate until we've found and contacted k nodes
while (Object.keys(contacted).length <= this.nodeGraph.nodeBucketLimit) {
if (signal?.aborted) throw new nodesErrors.ErrorNodeAborted();
if (signal?.aborted) throw signal.reason;
// While (!foundTarget) {
// Remove the node from the front of the array
const nextNode = shortlist.shift();
Expand Down Expand Up @@ -522,6 +513,7 @@ class NodeConnectionManager {
targetNodeId,
timer,
);
if (foundClosest.length === 0) continue;
// Check to see if any of these are the target node. At the same time, add
// them to the shortlist
for (const [nodeId, nodeData] of foundClosest) {
Expand Down Expand Up @@ -585,36 +577,43 @@ class NodeConnectionManager {
// Construct the message
const nodeIdMessage = new nodesPB.Node();
nodeIdMessage.setNodeId(nodesUtils.encodeNodeId(targetNodeId));
// Send through client
return this.withConnF(
nodeId,
async (connection) => {
const client = connection.getClient();
const response = await client.nodesClosestLocalNodesGet(nodeIdMessage);
const nodes: Array<[NodeId, NodeData]> = [];
// Loop over each map element (from the returned response) and populate nodes
response.getNodeTableMap().forEach((address, nodeIdString: string) => {
const nodeId = nodesUtils.decodeNodeId(nodeIdString);
// If the nodeId is not valid we don't add it to the list of nodes
if (nodeId != null) {
nodes.push([
nodeId,
{
address: {
host: address.getHost() as Host | Hostname,
port: address.getPort() as Port,
},
// Not really needed
// But if it's needed then we need to add the information to the proto definition
lastUpdated: 0,
try {
// Send through client
const response = await this.withConnF(
nodeId,
async (connection) => {
const client = connection.getClient();
return await client.nodesClosestLocalNodesGet(nodeIdMessage);
},
timer,
);
const nodes: Array<[NodeId, NodeData]> = [];
// Loop over each map element (from the returned response) and populate nodes
response.getNodeTableMap().forEach((address, nodeIdString: string) => {
const nodeId = nodesUtils.decodeNodeId(nodeIdString);
// If the nodeId is not valid we don't add it to the list of nodes
if (nodeId != null) {
nodes.push([
nodeId,
{
address: {
host: address.getHost() as Host | Hostname,
port: address.getPort() as Port,
},
]);
}
});
return nodes;
},
timer,
);
// Not really needed
// But if it's needed then we need to add the information to the proto definition
lastUpdated: 0,
},
]);
}
});
return nodes;
} catch (e) {
if (nodesUtils.isConnectionError(e)) {
return [];
}
throw e;
}
}

/**
Expand Down
12 changes: 12 additions & 0 deletions src/nodes/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import type { KeyPath } from '@matrixai/db';
import { IdInternal } from '@matrixai/id';
import lexi from 'lexicographic-integer';
import { utils as dbUtils } from '@matrixai/db';
import * as nodesErrors from './errors';
import { bytes2BigInt } from '../utils';
import * as keysUtils from '../keys/utils';
import * as grpcErrors from '../grpc/errors';
import * as agentErrors from '../agent/errors';

const sepBuffer = dbUtils.sep;

Expand Down Expand Up @@ -310,6 +313,14 @@ function generateRandomNodeIdForBucket(
return xOrNodeId(nodeId, randomDistanceForBucket);
}

function isConnectionError(e): boolean {
return (
e instanceof nodesErrors.ErrorNodeConnectionDestroyed ||
e instanceof grpcErrors.ErrorGRPC ||
e instanceof agentErrors.ErrorAgentClientDestroyed
);
}

export {
sepBuffer,
encodeNodeId,
Expand All @@ -330,4 +341,5 @@ export {
generateRandomDistanceForBucket,
xOrNodeId,
generateRandomNodeIdForBucket,
isConnectionError,
};

0 comments on commit 81209af

Please sign in to comment.