Skip to content

Commit

Permalink
feat(iroha-connector): implement validator interface
Browse files Browse the repository at this point in the history
Add monitoring and sending async/sync requests to ledger via SocketIO.

Closes: hyperledger-cacti#1941
Signed-off-by: Michal Bajer <[email protected]>
  • Loading branch information
stepniowskip authored and outSH committed Jul 15, 2022
1 parent 5834efe commit 15e87ac
Show file tree
Hide file tree
Showing 16 changed files with 1,759 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import type { Observable } from "rxjs";
*/
export interface ISocketApiClient<BlockType> {
sendAsyncRequest?(
contract: Record<string, unknown>,
method: Record<string, unknown>,
args: any,
contract?: Record<string, unknown>,
method?: Record<string, unknown>,
args?: any,
baseConfig?: any,
): void;

sendSyncRequest?(
contract: Record<string, unknown>,
method: Record<string, unknown>,
args: any,
contract?: Record<string, unknown>,
method?: Record<string, unknown>,
args?: any,
baseConfig?: any,
): Promise<any>;

watchBlocksV1?(
Expand Down
1 change: 1 addition & 0 deletions packages/cactus-plugin-ledger-connector-iroha/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
"iroha-helpers-ts": "0.9.25-ss",
"openapi-types": "7.0.1",
"prom-client": "13.1.0",
"rxjs": "7.3.0",
"typescript-optional": "2.0.1"
},
"devDependencies": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@
"type": "boolean",
"nullable": false,
"description": "Can only be set to false for an insecure grpc connection."
},
"monitorMode": {
"type": "boolean",
"nullable": true,
"description": "Flag used for monitoring. It changes default beahviour of transaction wrapper so it return error to caller instead of throwing RuntimeError straight away."
}
}
},
Expand All @@ -231,6 +236,91 @@
"PrometheusExporterMetricsResponse": {
"type": "string",
"nullable": false
},
"WatchBlocksV1": {
"type": "string",
"enum": [
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Subscribe",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Next",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Unsubscribe",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Error",
"org.hyperledger.cactus.api.async.iroha.WatchBlocksV1.Complete"
],
"x-enum-varnames": [
"Subscribe",
"Next",
"Unsubscribe",
"Error",
"Complete",
"SendAsyncRequest",
"SendSyncRequest"
]
},
"IrohaSocketIOTransactV1": {
"type": "string",
"enum": [
"org.hyperledger.cactus.api.async.iroha.IrohaSocketIOTransactV1.SendAsyncRequest",
"org.hyperledger.cactus.api.async.iroha.IrohaSocketIOTransactV1.SendSyncRequest"
],
"x-enum-varnames": [
"SendAsyncRequest",
"SendSyncRequest"
]
},
"IrohaBlockResponse": {
"type": "object",
"required": [
"payload",
"signaturesList"
],
"properties": {
"payload": {
"type": "object",
"required": [
"txNumber",
"transactionsList",
"height",
"prevBlockHash",
"createdTime",
"rejectedTransactionsHashesList"
],
"properties": {
"transactionsList": {
"type": "array",
"items": {}
},
"txNumber": {
"type": "number"
},
"height": {
"type": "number"
},
"prevBlockHash": {
"type": "string"
},
"createdTime": {
"type": "number"
},
"rejectedTransactionsHashesList": {
"type": "array",
"items": {}
}
}
},
"signaturesList":{
"type": "array",
"items": {}
}
}
},
"IrohaBlockProgress": {
"type": "object",
"required": ["transactionReceipt"],
"properties": {
"transactionReceipt": {
"$ref": "#/components/schemas/IrohaBlockResponse"
}
}
}
}
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
import { Observable, ReplaySubject, share } from "rxjs";
import { finalize } from "rxjs/operators";
import { Socket, io } from "socket.io-client";
import { Logger, Checks } from "@hyperledger/cactus-common";
import { LogLevelDesc, LoggerProvider } from "@hyperledger/cactus-common";
import { Constants, ISocketApiClient } from "@hyperledger/cactus-core-api";
import {
DefaultApi,
WatchBlocksV1,
IrohaSocketIOTransactV1,
IrohaBlockProgress,
IrohaBaseConfig,
} from "../generated/openapi/typescript-axios";
import { Configuration } from "../generated/openapi/typescript-axios/configuration";
import { RuntimeError } from "run-time-error";

export class IrohaApiClientOptions extends Configuration {
readonly logLevel?: LogLevelDesc;
readonly wsApiHost?: string;
readonly wsApiPath?: string;
}

export class IrohaApiClient
extends DefaultApi
implements ISocketApiClient<IrohaBlockProgress> {
public static readonly CLASS_NAME = "IrohaApiClient";
private readonly log: Logger;
private readonly wsApiHost: string;
private readonly wsApiPath: string;
public get className(): string {
return IrohaApiClient.CLASS_NAME;
}

constructor(public readonly options: IrohaApiClientOptions) {
super(options);
Checks.truthy(options, "privKey in baseConfig");

const level = this.options.logLevel || "INFO";
const label = this.className;
this.log = LoggerProvider.getOrCreate({ level, label });

this.wsApiHost = options.wsApiHost || options.basePath || location.host;
this.wsApiPath = options.wsApiPath || Constants.SocketIoConnectionPathV1;
this.log.debug(`Created ${this.className} OK.`);
this.log.debug(`wsApiHost=${this.wsApiHost}`);
this.log.debug(`wsApiPath=${this.wsApiPath}`);
this.log.debug(`basePath=${this.options.basePath}`);

Checks.nonBlankString(
this.wsApiHost,
`${this.className}::constructor() wsApiHost`,
);
Checks.nonBlankString(
this.wsApiPath,
`${this.className}::constructor() wsApiPath`,
);
}

/**
* Start monitoring for new blocks on the Iroha ledger.
* @param monitorOptions - Options to be passed to validator `startMonitoring` procedure.
* @returns RxJs Observable, `next` - new block, `error` - any error from the validator.
*/
public watchBlocksV1(
monitorOptions?: Record<string, unknown>,
): Observable<IrohaBlockProgress> {
const socket: Socket = io(this.wsApiHost, { path: this.wsApiPath });
const subject = new ReplaySubject<IrohaBlockProgress>(0);
this.log.debug(monitorOptions);
socket.on(WatchBlocksV1.Next, (data: IrohaBlockProgress) => {
subject.next(data);
});

socket.on("connect", () => {
this.log.debug("connected OK...");
socket.emit(WatchBlocksV1.Subscribe, monitorOptions);
});

socket.connect();

socket.on("connect_error", (err: Error) => {
this.log.error("Error (connect_error): ", err);
socket.disconnect();
subject.error(err);
});

socket.on("connect_timeout", (err: Record<string, unknown>) => {
this.log.error("Error (connect_timeout): ", err);
socket.disconnect();
subject.error(err);
});

return subject.pipe(
finalize(() => {
this.log.info("FINALIZE - unsubscribing from the stream...");
socket.emit(WatchBlocksV1.Unsubscribe);
socket.disconnect();
}),
share(),
);
}

/**
* Immediately sends request to the validator, doesn't report any error or responses.
* @param args - arguments.
* @param method - function / method to be executed by validator.
* @param baseConfig - baseConfig needed to properly connect to ledger
*/
public sendAsyncRequest(
method: Record<string, unknown>,
args: any,
baseConfig?: IrohaBaseConfig,
): void {
this.log.debug(`inside: sendAsyncRequest()`);
this.log.debug(`baseConfig=${baseConfig}`);
this.log.debug(`methodName=${method.methodName}`);
this.log.debug(`args=${args}`);

if (!baseConfig) {
throw new RuntimeError("baseConfig object must exist and not be empty");
}

Checks.truthy(baseConfig.privKey, "privKey in baseConfig");
Checks.truthy(
baseConfig.creatorAccountId,
"creatorAccountId in baseConfig",
);
Checks.truthy(baseConfig.irohaHost, "irohaHost in baseConfig");
Checks.truthy(baseConfig.irohaPort, "irohaPort in baseConfig");
Checks.truthy(baseConfig.quorum, "quorum in baseConfig");
Checks.truthy(baseConfig.timeoutLimit, "timeoutLimit in baseConfig");
Checks.nonBlankString(method.methodName, "methodName");

const socket: Socket = io(this.wsApiHost, { path: this.wsApiPath });
const asyncRequestData = {
baseConfig: baseConfig,
methodName: method.methodName,
args: args,
};

this.log.debug("requestData:", asyncRequestData);

try {
socket.emit(IrohaSocketIOTransactV1.SendAsyncRequest, asyncRequestData);

// Connector should disconnect us after receiving this request.
// If he doesn't, disconnect after specified amount of time.
setTimeout(() => {
if (socket.connected) {
socket.disconnect();
}
}, baseConfig.timeoutLimit ?? 10 * 1000);
} catch (err) {
this.log.error("Exception in: sendAsyncRequest(): ", err);
throw err;
}
}

/**
* Sends request to be executed on the ledger, watches and reports any error and the response from a ledger.
* @param args - arguments.
* @param method - function / method to be executed by validator.
* @param baseConfig - baseConfig needed to properly connect to ledger
* @returns Promise that will resolve with response from the ledger, or reject when error occurred.
*/
public sendSyncRequest(
method: Record<string, unknown>,
args: any,
baseConfig?: IrohaBaseConfig,
): Promise<any> {
this.log.debug(`inside: sendSyncRequest()`);
this.log.debug(`baseConfig=${baseConfig}`);
this.log.debug(`method=${method}`);
this.log.debug(`args=${args}`);

if (!baseConfig) {
throw new RuntimeError("baseConfig object must exist and not be empty");
}

Checks.truthy(baseConfig.privKey, "privKey in baseConfig");
Checks.truthy(
baseConfig.creatorAccountId,
"creatorAccountId in baseConfig",
);
Checks.truthy(baseConfig.irohaHost, "irohaHost in baseConfig");
Checks.truthy(baseConfig.irohaPort, "irohaPort in baseConfig");
Checks.truthy(baseConfig.quorum, "quorum in baseConfig");
Checks.truthy(baseConfig.timeoutLimit, "timeoutLimit in baseConfig");
Checks.nonBlankString(method.methodName, "methodName");

const socket: Socket = io(this.wsApiHost, { path: this.wsApiPath });

let responseFlag = false;

return new Promise((resolve, reject) => {
try {
socket.on("connect_error", (err: Error) => {
this.log.error("Error (connect_error): ", err);
socket.disconnect();
reject(err);
});

socket.on("connect_timeout", (err: Record<string, unknown>) => {
this.log.error("Error (connect_timeout): ", err);
socket.disconnect();
reject(err);
});

socket.on("error", (err: unknown) => {
socket.disconnect();
reject(err);
});

socket.on("response", (result: any) => {
responseFlag = true;
this.log.debug("#[recv]response, res:", result);
const resultObj = {
status: result.status,
data: result.txHash,
};
this.log.debug("resultObj =", resultObj);
socket.disconnect();
resolve(resultObj);
});

const syncRequestData = {
baseConfig: baseConfig,
methodName: method.methodName,
args: args,
};

this.log.debug("requestData:", syncRequestData);

try {
socket.emit(IrohaSocketIOTransactV1.SendSyncRequest, syncRequestData);
} catch (err) {
this.log.error("Exception in: sendAsyncRequest(): ", err);
throw err;
}

setTimeout(() => {
if (responseFlag === false) {
socket.disconnect();
resolve({ status: 504 });
}
}, baseConfig.timeoutLimit);
} catch (err) {
this.log.error("Exception in: sendSyncRequest(): ", err);
reject(err);
}
});
}
}
Loading

0 comments on commit 15e87ac

Please sign in to comment.