Skip to content

Commit

Permalink
chore: centralize callbackPromise resolve
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Jan 23, 2024
1 parent 8b1a202 commit 259a576
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
39 changes: 17 additions & 22 deletions packages/discv5/src/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import {
PongResponse,
ResponseType,
SignableENRInput,
toResponseType,
} from "./types.js";
import { RateLimiter, RateLimiterOpts } from "../rateLimit/index.js";
import {
Expand Down Expand Up @@ -878,23 +879,21 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
try {
switch (response.type) {
case MessageType.PONG:
return this.handlePong(nodeAddr, activeRequest, response as IPongMessage);
this.handlePong(nodeAddr, activeRequest, response as IPongMessage);
break;
case MessageType.NODES:
return this.handleNodes(
nodeAddr,
activeRequest as IActiveRequest<IFindNodeMessage>,
response as INodesMessage
);
if (!this.handleNodes(nodeAddr, activeRequest as IActiveRequest<IFindNodeMessage>, response as INodesMessage))
return;
break;
case MessageType.TALKRESP:
return this.handleTalkResp(
nodeAddr,
activeRequest as IActiveRequest<ITalkReqMessage>,
response as ITalkRespMessage
);
this.handleTalkResp(nodeAddr, activeRequest as IActiveRequest<ITalkReqMessage>, response as ITalkRespMessage);
break;
default:
// TODO Implement all RPC methods
return;
}

activeRequest.callbackPromise?.resolve(toResponseType(response));
} catch (e) {
log("Error handling rpc response: node: %o response: %s", nodeAddr, responseType);
activeRequest.callbackPromise?.reject(new CodeError(ResponseErrorType.InternalError, (e as Error).message));
Expand Down Expand Up @@ -934,17 +933,17 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
}
this.connectionUpdated(nodeAddr.nodeId, { type: ConnectionStatusType.PongReceived, enr });
}

// If this is initiated by the user, return the error on the callback.
activeRequest.callbackPromise?.resolve(message);
}

/**
* Return true if this nodes message is the final in the response
*/
private handleNodes(
nodeAddr: INodeAddress,
activeRequest: IActiveRequest<IFindNodeMessage>,
message: INodesMessage
): void {
const { request, lookupId, callbackPromise } = activeRequest;
): boolean {
const { request, lookupId } = activeRequest;
// Currently a maximum of 16 peers can be returned.
// Datagrams have a max size of 1280 and ENRs have a max size of 300 bytes.
// There should be no more than 5 responses to return 16 peers
Expand All @@ -969,7 +968,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
currentResponse.enrs.push(...message.enrs);
this.activeRequests.set(message.id, activeRequest as IActiveRequest<RequestMessage>);
this.activeNodesResponses.set(message.id, currentResponse);
return;
return false;
}

// Have received all the Nodes responses we are willing to accept
Expand All @@ -986,8 +985,7 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {

this.discovered(nodeAddr.nodeId, message.enrs, lookupId);

// If this is initiated by the user, return the response on the callback.
callbackPromise?.resolve(message.enrs);
return true;
}

private handleTalkResp = (
Expand All @@ -997,9 +995,6 @@ export class Discv5 extends (EventEmitter as { new (): Discv5EventEmitter }) {
): void => {
log("Received TALKRESP message from Node: %o", nodeAddr);
this.emit("talkRespReceived", nodeAddr, this.findEnr(nodeAddr.nodeId) ?? null, message);

// If this is initiated by the user, return the response on the callback.
activeRequest.callbackPromise?.resolve(message.response);
};

/**
Expand Down
20 changes: 19 additions & 1 deletion packages/discv5/src/service/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ import StrictEventEmitter from "strict-event-emitter-types";
import { Multiaddr } from "@multiformats/multiaddr";
import { ENR, SequenceNumber, SignableENR } from "@chainsafe/enr";

import { ITalkReqMessage, ITalkRespMessage, RequestMessage } from "../message/index.js";
import {
INodesMessage,
IPongMessage,
ITalkReqMessage,
ITalkRespMessage,
MessageType,
RequestMessage,
} from "../message/index.js";
import { INodeAddress, NodeContact } from "../session/nodeInfo.js";
import { ConnectionDirection, RequestErrorType, ResponseErrorType } from "../session/index.js";
import { SocketAddress } from "../util/ip.js";
Expand Down Expand Up @@ -96,6 +103,17 @@ export type PongResponse = {

export type ResponseType = Buffer | ENR[] | PongResponse;

export function toResponseType(response: IPongMessage | INodesMessage | ITalkRespMessage): ResponseType {
switch (response.type) {
case MessageType.PONG:
return { enrSeq: response.enrSeq, addr: response.addr };
case MessageType.NODES:
return response.enrs;
case MessageType.TALKRESP:
return response.response;
}
}

export enum ConnectionStatusType {
Connected,
PongReceived,
Expand Down

0 comments on commit 259a576

Please sign in to comment.