diff --git a/README.md b/README.md index 6c6bd7e..3ed958c 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ JavaScript client library for connecting to and querying Apache Pinot :wine_glass:, a realtime distributed OLAP datastore. +[GitHub Repo](https://github.com/kffl/pinot-client-node), [TypeDoc Reference](https://kffl.github.io/pinot-client-node/), [npm Package](https://www.npmjs.com/package/pinot-client) + ## Features - Implements a controller-based broker selector that periodically updates the table-to-broker mapping via the controller API. @@ -92,6 +94,7 @@ UA 7457 - `logger`: a logger instance conforming to the standard Log4j interface w/ .child() method (i.e. pino, winston or log4js) - `brokerReqHeaders`: additional HTTP headers (object key: value) to include in broker query API requests +- `customHttpClient`: a custom HTTP client implementation on top of that, `ConnectionFactory.fromController()` options may include two additional keys: @@ -129,3 +132,30 @@ const options = { const connection = await ConnectionFactory.fromController("localhost:9000", options); ``` + +### Using a custom HTTP client + +By default `pinot-client` uses `undici` for performing HTTP requests against Pinot Borkers and Controllers. A custom `HttpClient` interface implementation (containing POST and GET methods) can be provided instead via the `customHttpClient` options key: + +```typescript + +const myClient: HttpClient = { + get: async function (url: string, options: { headers: Record }) { + const { statusCode, parsedBody } = await otherHTTPClientLib(url, ...); + // data is of type T + return { status: statusCode, data: parsedBody }; + }, + post: async function (url: string, data: object, options: { headers: Record }) { + const { statusCode, parsedBody } = await otherHTTPClientLib(url, ...); + // data is of type T + return { status: statusCode, data: parsedBody }; + }, +}; + +const options = { + customHttpClient: myClient, +} + +const c = await ConnectionFactory.fromController("http://localhost:9000", options); + +``` diff --git a/package.json b/package.json index 6b3ea1a..ff0da05 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,6 @@ "typescript": "^4.7.2" }, "dependencies": { - "axios": "^0.27.2" + "undici": "^5.8.0" } } diff --git a/src/connection-factory.spec.ts b/src/connection-factory.spec.ts index df16e8c..e6ca977 100644 --- a/src/connection-factory.spec.ts +++ b/src/connection-factory.spec.ts @@ -1,5 +1,7 @@ import fastify from "fastify"; +import { mock } from "jest-mock-extended"; import { ConnectionFactory } from "./connection-factory"; +import { HttpClient } from "./http-client.interface"; describe("ConnectionFactory", () => { let brokerTestServer; @@ -8,11 +10,8 @@ describe("ConnectionFactory", () => { let controllerTestServer; const controllerHandlerHeaders = jest.fn(); - beforeEach(async () => { - brokerHandlerBody.mockClear(); - brokerHandlerHeaders.mockClear(); - controllerHandlerHeaders.mockClear(); - brokerTestServer = fastify(); + beforeAll(async () => { + brokerTestServer = fastify({ forceCloseConnections: true }); brokerTestServer.post("/query/sql", (req, res) => { brokerHandlerBody(req.body); brokerHandlerHeaders(req.headers); @@ -23,7 +22,7 @@ describe("ConnectionFactory", () => { }); await brokerTestServer.listen(8000, "0.0.0.0"); - controllerTestServer = fastify(); + controllerTestServer = fastify({ forceCloseConnections: true }); controllerTestServer.get("/v2/brokers/tables", (req, res) => { controllerHandlerHeaders(req.headers); const r = Buffer.from( @@ -34,11 +33,17 @@ describe("ConnectionFactory", () => { await controllerTestServer.listen(9000, "0.0.0.0"); }); - afterEach(async () => { + afterAll(async () => { await brokerTestServer.close(); await controllerTestServer.close(); }); + beforeEach(async () => { + brokerHandlerBody.mockClear(); + brokerHandlerHeaders.mockClear(); + controllerHandlerHeaders.mockClear(); + }); + describe("fromController method", () => { it("should query the controller API when initialized via fromController", async () => { const c = await ConnectionFactory.fromController("localhost:9000"); @@ -78,6 +83,32 @@ describe("ConnectionFactory", () => { }); await expect(ConnectionFactory.fromController("localhost:9000")).rejects.toThrowError("status code: 500"); }); + + it("should use a custom HttpClient when one is provided", async () => { + const httpClient = mock(); + httpClient.get.mockResolvedValueOnce({ + status: 200, + data: { baseballStats: [{ port: 8000, host: "localhost", instanceName: "Broker_172.17.0.2_8000" }] }, + }); + httpClient.post.mockResolvedValueOnce({ + status: 200, + data: { + resultTable: { + dataSchema: { columnNames: ["league", "hits"], columnDataTypes: ["STRING", "DOUBLE"] }, + rows: [["NL", 1890198.0]], + }, + exceptions: [], + }, + }); + const c = await ConnectionFactory.fromController("localhost:9000", { + customHttpClient: httpClient, + brokerUpdateFreqMs: 10000, + }); + await c.execute("baseballStats", "query"); + await c.close(); + expect(httpClient.get).toHaveBeenCalledTimes(1); + expect(httpClient.post).toHaveBeenCalledTimes(1); + }); }); describe("fromHostList method", () => { it("should throw an error if no brokers are provided", () => { @@ -107,5 +138,25 @@ describe("ConnectionFactory", () => { expect(brokerHandlerHeaders).toHaveBeenCalledTimes(1); expect(brokerHandlerHeaders.mock.calls[0][0]["custom-header"]).toEqual("custom-value"); }); + it("should use a custom HttpClient when one is provided", async () => { + const httpClient = mock(); + httpClient.post.mockResolvedValueOnce({ + status: 200, + data: { + resultTable: { + dataSchema: { columnNames: ["league", "hits"], columnDataTypes: ["STRING", "DOUBLE"] }, + rows: [["NL", 1890198.0]], + }, + exceptions: [], + }, + }); + const c = await ConnectionFactory.fromHostList(["localhost:9000"], { + customHttpClient: httpClient, + }); + await c.execute("baseballStats", "query"); + await c.close(); + expect(httpClient.get).toHaveBeenCalledTimes(0); + expect(httpClient.post).toHaveBeenCalledTimes(1); + }); }); }); diff --git a/src/connection-factory.ts b/src/connection-factory.ts index 5bbbe94..a062ae7 100644 --- a/src/connection-factory.ts +++ b/src/connection-factory.ts @@ -1,12 +1,13 @@ import { Connection } from "./connection"; import { SimpleBrokerSelector } from "./simple-broker-selector"; import { JsonBrokerClientTransport } from "./json-broker-client-transport"; -import axios from "axios"; import { JsonControllerClientTransport } from "./json-controller-client-transport"; import { ControllerBasedBrokerSelector } from "./controller-based-broker-selector"; import { SelectorUpdaterPeriodic } from "./selector-updater-periodic"; import { Logger } from "./logger.interface"; import { dummyLogger } from "./dummy-logger"; +import { UndiciHttpClient } from "./undici-http-client"; +import { HttpClient } from "./http-client.interface"; export type FromHostListOptions = { /** @@ -17,11 +18,16 @@ export type FromHostListOptions = { * Additional HTTP headers to include in broker query API requests */ brokerReqHeaders: Record; + /** + * A custom http client to execute broker and controller HTTP requests (GET and POST) + */ + customHttpClient: HttpClient; }; const fromHostListDefaultOptions: FromHostListOptions = { logger: dummyLogger, brokerReqHeaders: {}, + customHttpClient: null, }; export type FromControllerOptions = FromHostListOptions & { @@ -39,6 +45,7 @@ export type FromControllerOptions = FromHostListOptions & { const fromControllerDefaultOptions: FromControllerOptions = { logger: dummyLogger, brokerReqHeaders: {}, + customHttpClient: null, controllerReqHeaders: {}, brokerUpdateFreqMs: 1000, }; @@ -55,15 +62,16 @@ async function fromController( options: Partial = {} ): Promise { const actualOptions = Object.assign({}, fromControllerDefaultOptions, options); + const httpClient = options.customHttpClient ? options.customHttpClient : new UndiciHttpClient(); const controllerTransport = new JsonControllerClientTransport( controllerAddress, - axios.get, + httpClient, actualOptions.controllerReqHeaders ); const brokerSelector = new ControllerBasedBrokerSelector(controllerTransport, actualOptions.logger); await brokerSelector.setup(); const updater = new SelectorUpdaterPeriodic(brokerSelector, actualOptions.brokerUpdateFreqMs, actualOptions.logger); - const brokerTransport = new JsonBrokerClientTransport(axios.post, actualOptions.brokerReqHeaders); + const brokerTransport = new JsonBrokerClientTransport(httpClient, actualOptions.brokerReqHeaders); return new Connection(brokerSelector, brokerTransport, actualOptions.logger, updater); } @@ -74,8 +82,9 @@ async function fromController( */ function fromHostList(brokerAddresses: string[], options: Partial = {}): Connection { const actualOptions = Object.assign({}, fromHostListDefaultOptions, options); + const httpClient = options.customHttpClient ? options.customHttpClient : new UndiciHttpClient(); const brokerSelector = new SimpleBrokerSelector(brokerAddresses); - const transport = new JsonBrokerClientTransport(axios.post, actualOptions.brokerReqHeaders); + const transport = new JsonBrokerClientTransport(httpClient, actualOptions.brokerReqHeaders); return new Connection(brokerSelector, transport, actualOptions.logger); } diff --git a/src/http-client.interface.ts b/src/http-client.interface.ts new file mode 100644 index 0000000..f761faa --- /dev/null +++ b/src/http-client.interface.ts @@ -0,0 +1,38 @@ +/** + * HttpClient interface can be used for providing a custom client responsible + * for executing GET and POST requests against Pinot Controllers and Brokers. + */ +export interface HttpClient { + /** + * Executes a HTTP GET request at a specified url. + * Used for sending requests to Pinot Controllers. + */ + get: ( + url: string, + options: { + headers: Record; + } + ) => Promise>; + /** + * Executes a HTTP POST request at a specified url + * Used for querying Pinot Brokers. + */ + post: ( + url: string, + data: object, + options: { + headers: Record; + } + ) => Promise>; +} + +export interface HttpClientResponse { + /** + * Parsed data from the HTTP response body + */ + data: T; + /** + * HTTP response status code + */ + status: number; +} diff --git a/src/index.ts b/src/index.ts index dacd8c9..f98c124 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ export { Connection } from "./connection"; export { ConnectionFactory, FromHostListOptions, FromControllerOptions } from "./connection-factory"; +export { HttpClient, HttpClientResponse } from "./http-client.interface"; export * from "./broker-response.types"; diff --git a/src/json-broker-client-transport.spec.ts b/src/json-broker-client-transport.spec.ts index 0f88ad5..ffe788b 100644 --- a/src/json-broker-client-transport.spec.ts +++ b/src/json-broker-client-transport.spec.ts @@ -1,17 +1,18 @@ -import { AxiosError } from "axios"; +import { mock, mockClear } from "jest-mock-extended"; +import { HttpClient } from "./http-client.interface"; import { JsonBrokerClientTransport } from "./json-broker-client-transport"; describe("JsonBrokerClientTransport class", () => { - const mockHttpPostFn = jest.fn(); + const mockHttpClient = mock(); beforeEach(() => { - mockHttpPostFn.mockClear(); + mockClear(mockHttpClient); }); - it("should call the HttpPostFn", async () => { - mockHttpPostFn.mockResolvedValueOnce({ data: {} }); - const transport = new JsonBrokerClientTransport(mockHttpPostFn, {}); + it("should call the HttpClient post method", async () => { + mockHttpClient.post.mockResolvedValueOnce({ data: {}, status: 200 }); + const transport = new JsonBrokerClientTransport(mockHttpClient, {}); await transport.executeQuery("addr:8000", "query"); - expect(mockHttpPostFn).toHaveBeenCalledTimes(1); - expect(mockHttpPostFn).toHaveBeenCalledWith( + expect(mockHttpClient.post).toHaveBeenCalledTimes(1); + expect(mockHttpClient.post).toHaveBeenCalledWith( "http://addr:8000/query/sql", { sql: "query" }, { @@ -22,11 +23,11 @@ describe("JsonBrokerClientTransport class", () => { ); }); it("should add custom request headers", async () => { - mockHttpPostFn.mockResolvedValueOnce({ data: {} }); - const transport = new JsonBrokerClientTransport(mockHttpPostFn, { foo: "bar", boo: "baz" }); + mockHttpClient.post.mockResolvedValueOnce({ data: {}, status: 200 }); + const transport = new JsonBrokerClientTransport(mockHttpClient, { foo: "bar", boo: "baz" }); await transport.executeQuery("addr:8000", "query"); - expect(mockHttpPostFn).toHaveBeenCalledTimes(1); - expect(mockHttpPostFn).toHaveBeenCalledWith( + expect(mockHttpClient.post).toHaveBeenCalledTimes(1); + expect(mockHttpClient.post).toHaveBeenCalledWith( "http://addr:8000/query/sql", { sql: "query" }, { @@ -39,16 +40,15 @@ describe("JsonBrokerClientTransport class", () => { ); }); it("should throw an error with status code on HTTP error", async () => { - const errWithResponse = new AxiosError("message"); - errWithResponse.response = { status: 500, data: {}, headers: {}, statusText: "", config: {} }; - mockHttpPostFn.mockRejectedValueOnce(errWithResponse); - const transport = new JsonBrokerClientTransport(mockHttpPostFn, {}); + const response = { status: 500, data: { error: "description" } }; + mockHttpClient.post.mockResolvedValueOnce(response); + const transport = new JsonBrokerClientTransport(mockHttpClient, {}); await expect(transport.executeQuery("addr:8000", "query")).rejects.toThrowError("500"); }); it("should throw an error with message on other errors", async () => { - const errWithMessage = new AxiosError("sample message"); - mockHttpPostFn.mockRejectedValueOnce(errWithMessage); - const transport = new JsonBrokerClientTransport(mockHttpPostFn, {}); + const errWithMessage = new Error("sample message"); + mockHttpClient.post.mockRejectedValueOnce(errWithMessage); + const transport = new JsonBrokerClientTransport(mockHttpClient, {}); await expect(transport.executeQuery("addr:8000", "query")).rejects.toThrowError("sample message"); }); }); diff --git a/src/json-broker-client-transport.ts b/src/json-broker-client-transport.ts index 2fd9551..30446ef 100644 --- a/src/json-broker-client-transport.ts +++ b/src/json-broker-client-transport.ts @@ -1,38 +1,27 @@ import { BrokerClientTransport } from "./broker-client-transport.interface"; import { BrokerResponse } from "./broker-response.types"; +import { HttpClient } from "./http-client.interface"; import { PinotClientError } from "./pinot-client-error"; import { withProtocol } from "./url"; -export type HttpPostFn = ( - url: string, - data: object, - options: { - headers: Record; - } -) => Promise<{ data: T; status: number }>; - export class JsonBrokerClientTransport implements BrokerClientTransport { - constructor( - private readonly httpPost: HttpPostFn, - private readonly reqHeaders: Record - ) {} + constructor(private readonly client: HttpClient, private readonly reqHeaders: Record) {} public async executeQuery(brokerAddress: string, query: string) { try { const url = withProtocol(brokerAddress) + "/query/sql"; const body = { sql: query }; - const { data } = await this.httpPost(url, body, { + const { data, status } = await this.client.post(url, body, { headers: { ...this.reqHeaders, "Content-Type": "application/json; charset=utf-8", }, }); + if (status !== 200) { + throw new PinotClientError("Broker responded with HTTP status code: " + status); + } return data; } catch (e) { - if (e.response) { - throw new PinotClientError("Broker responded with HTTP status code: " + e.response.status); - } else { - throw new PinotClientError("An error occurred when sending request to the broker: " + e.message); - } + throw new PinotClientError("An error occurred when sending request to the broker: " + e.message); } } } diff --git a/src/json-controller-client-transport.spec.ts b/src/json-controller-client-transport.spec.ts index d2cb0a3..37a666e 100644 --- a/src/json-controller-client-transport.spec.ts +++ b/src/json-controller-client-transport.spec.ts @@ -1,17 +1,18 @@ -import { AxiosError } from "axios"; +import { mock, mockClear } from "jest-mock-extended"; +import { HttpClient } from "./http-client.interface"; import { JsonControllerClientTransport } from "./json-controller-client-transport"; describe("JsonControllerClientTransport class", () => { - const mockHttpGetFn = jest.fn(); + const mockHttpClient = mock(); beforeEach(() => { - mockHttpGetFn.mockClear(); + mockClear(mockHttpClient); }); - it("should call the HttpGetFn", async () => { - mockHttpGetFn.mockResolvedValueOnce({ data: {} }); - const transport = new JsonControllerClientTransport("controller:9000", mockHttpGetFn, {}); + it("should call the HttpClient get method", async () => { + mockHttpClient.get.mockResolvedValueOnce({ data: {}, status: 200 }); + const transport = new JsonControllerClientTransport("controller:9000", mockHttpClient, {}); const r = await transport.getTableToBrokerMapping(); - expect(mockHttpGetFn).toHaveBeenCalledTimes(1); - expect(mockHttpGetFn).toHaveBeenCalledWith("http://controller:9000/v2/brokers/tables?state=ONLINE", { + expect(mockHttpClient.get).toHaveBeenCalledTimes(1); + expect(mockHttpClient.get).toHaveBeenCalledWith("http://controller:9000/v2/brokers/tables?state=ONLINE", { headers: { "Content-Type": "application/json; charset=utf-8", }, @@ -19,11 +20,11 @@ describe("JsonControllerClientTransport class", () => { expect(r).toEqual({}); }); it("should add custom request headers", async () => { - mockHttpGetFn.mockResolvedValueOnce({ data: {} }); - const transport = new JsonControllerClientTransport("controller:9000", mockHttpGetFn, { key: "val" }); + mockHttpClient.get.mockResolvedValueOnce({ data: {}, status: 200 }); + const transport = new JsonControllerClientTransport("controller:9000", mockHttpClient, { key: "val" }); const r = await transport.getTableToBrokerMapping(); - expect(mockHttpGetFn).toHaveBeenCalledTimes(1); - expect(mockHttpGetFn).toHaveBeenCalledWith("http://controller:9000/v2/brokers/tables?state=ONLINE", { + expect(mockHttpClient.get).toHaveBeenCalledTimes(1); + expect(mockHttpClient.get).toHaveBeenCalledWith("http://controller:9000/v2/brokers/tables?state=ONLINE", { headers: { key: "val", "Content-Type": "application/json; charset=utf-8", @@ -32,16 +33,15 @@ describe("JsonControllerClientTransport class", () => { expect(r).toEqual({}); }); it("should throw an error with status code on HTTP error", async () => { - const errWithResponse = new AxiosError("message"); - errWithResponse.response = { status: 503, data: {}, headers: {}, statusText: "", config: {} }; - mockHttpGetFn.mockRejectedValueOnce(errWithResponse); - const transport = new JsonControllerClientTransport("addr:9000", mockHttpGetFn, {}); + const response = { status: 503, data: {} }; + mockHttpClient.get.mockResolvedValueOnce(response); + const transport = new JsonControllerClientTransport("addr:9000", mockHttpClient, {}); await expect(transport.getTableToBrokerMapping()).rejects.toThrowError("503"); }); it("should throw an error with message on other errors", async () => { - const errWithMessage = new AxiosError("sample message"); - mockHttpGetFn.mockRejectedValueOnce(errWithMessage); - const transport = new JsonControllerClientTransport("addr:9000", mockHttpGetFn, {}); + const errWithMessage = new Error("sample message"); + mockHttpClient.get.mockRejectedValueOnce(errWithMessage); + const transport = new JsonControllerClientTransport("addr:9000", mockHttpClient, {}); await expect(transport.getTableToBrokerMapping()).rejects.toThrowError("sample message"); }); }); diff --git a/src/json-controller-client-transport.ts b/src/json-controller-client-transport.ts index b0d7caf..41ea565 100644 --- a/src/json-controller-client-transport.ts +++ b/src/json-controller-client-transport.ts @@ -1,24 +1,18 @@ import { ControllerClientTransport } from "./controller-client-transport.interface"; import { ControllerResponse } from "./controller-response"; +import { HttpClient } from "./http-client.interface"; import { PinotClientError } from "./pinot-client-error"; import { withProtocol } from "./url"; -export type HttpGetFn = ( - url: string, - options: { - headers: Record; - } -) => Promise<{ data: T; status: number }>; - export class JsonControllerClientTransport implements ControllerClientTransport { constructor( private readonly controllerAddress: string, - private readonly httpGet: HttpGetFn, + private readonly client: HttpClient, private readonly reqHeaders: Record ) {} public async getTableToBrokerMapping() { try { - const { data } = await this.httpGet( + const { data, status } = await this.client.get( withProtocol(this.controllerAddress) + "/v2/brokers/tables?state=ONLINE", { headers: { @@ -27,13 +21,12 @@ export class JsonControllerClientTransport implements ControllerClientTransport }, } ); + if (status !== 200) { + throw new PinotClientError("Controller responded with HTTP status code: " + status); + } return data; } catch (e) { - if (e.response) { - throw new PinotClientError("Controller responded with HTTP status code: " + e.response.status); - } else { - throw new PinotClientError("An error occurred when sending request to the controller: " + e.message); - } + throw new PinotClientError("An error occurred when sending request to the controller: " + e.message); } } } diff --git a/src/undici-http-client.ts b/src/undici-http-client.ts new file mode 100644 index 0000000..15a22fd --- /dev/null +++ b/src/undici-http-client.ts @@ -0,0 +1,30 @@ +import { Agent, request } from "undici"; +import { HttpClient } from "./http-client.interface"; + +export class UndiciHttpClient implements HttpClient { + private agent: Agent; + constructor() { + this.agent = new Agent({ pipelining: 1 }); + } + public async post(url: string, data: object, options: { headers: Record }) { + const reqBody = JSON.stringify(data); + const { statusCode, body } = await request(url, { + ...options, + dispatcher: this.agent, + method: "POST", + body: reqBody, + }); + const respData = await body.json(); + return { data: respData, status: statusCode }; + } + public async get( + url: string, + options: { + headers: Record; + } + ) { + const { statusCode, body } = await request(url, { ...options, dispatcher: this.agent }); + const data = await body.json(); + return { data, status: statusCode }; + } +}