Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace axios with undici, allow the user to provide custom HTTP client implementation #17

Merged
merged 3 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

JavaScript client library for connecting to and querying Apache Pinot :wine_glass:, a realtime distributed OLAP datastore.

[GitHub Repo](https://github.com/kffl/pinot-client-node), [TypeDoc Reference](https://kffl.github.io/pinot-client-node/), [npm Package](https://www.npmjs.com/package/pinot-client)

## Features

- Implements a controller-based broker selector that periodically updates the table-to-broker mapping via the controller API.
Expand Down Expand Up @@ -92,6 +94,7 @@ UA 7457

- `logger`: a logger instance conforming to the standard Log4j interface w/ .child() method (i.e. pino, winston or log4js)
- `brokerReqHeaders`: additional HTTP headers (object key: value) to include in broker query API requests
- `customHttpClient`: a custom HTTP client implementation

on top of that, `ConnectionFactory.fromController()` options may include two additional keys:

Expand Down Expand Up @@ -129,3 +132,30 @@ const options = {

const connection = await ConnectionFactory.fromController("localhost:9000", options);
```

### Using a custom HTTP client

By default `pinot-client` uses `undici` for performing HTTP requests against Pinot Borkers and Controllers. A custom `HttpClient` interface implementation (containing POST and GET methods) can be provided instead via the `customHttpClient` options key:

```typescript

const myClient: HttpClient = {
get: async function <T>(url: string, options: { headers: Record<string, string> }) {
const { statusCode, parsedBody } = await otherHTTPClientLib<T>(url, ...);
// data is of type T
return { status: statusCode, data: parsedBody };
},
post: async function <T>(url: string, data: object, options: { headers: Record<string, string> }) {
const { statusCode, parsedBody } = await otherHTTPClientLib<T>(url, ...);
// data is of type T
return { status: statusCode, data: parsedBody };
},
};

const options = {
customHttpClient: myClient,
}

const c = await ConnectionFactory.fromController("http://localhost:9000", options);

```
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@
"typescript": "^4.7.2"
},
"dependencies": {
"axios": "^0.27.2"
"undici": "^5.8.0"
}
}
65 changes: 58 additions & 7 deletions src/connection-factory.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import fastify from "fastify";
import { mock } from "jest-mock-extended";
import { ConnectionFactory } from "./connection-factory";
import { HttpClient } from "./http-client.interface";

describe("ConnectionFactory", () => {
let brokerTestServer;
Expand All @@ -8,11 +10,8 @@ describe("ConnectionFactory", () => {
let controllerTestServer;
const controllerHandlerHeaders = jest.fn();

beforeEach(async () => {
brokerHandlerBody.mockClear();
brokerHandlerHeaders.mockClear();
controllerHandlerHeaders.mockClear();
brokerTestServer = fastify();
beforeAll(async () => {
brokerTestServer = fastify({ forceCloseConnections: true });
brokerTestServer.post("/query/sql", (req, res) => {
brokerHandlerBody(req.body);
brokerHandlerHeaders(req.headers);
Expand All @@ -23,7 +22,7 @@ describe("ConnectionFactory", () => {
});
await brokerTestServer.listen(8000, "0.0.0.0");

controllerTestServer = fastify();
controllerTestServer = fastify({ forceCloseConnections: true });
controllerTestServer.get("/v2/brokers/tables", (req, res) => {
controllerHandlerHeaders(req.headers);
const r = Buffer.from(
Expand All @@ -34,11 +33,17 @@ describe("ConnectionFactory", () => {
await controllerTestServer.listen(9000, "0.0.0.0");
});

afterEach(async () => {
afterAll(async () => {
await brokerTestServer.close();
await controllerTestServer.close();
});

beforeEach(async () => {
brokerHandlerBody.mockClear();
brokerHandlerHeaders.mockClear();
controllerHandlerHeaders.mockClear();
});

describe("fromController method", () => {
it("should query the controller API when initialized via fromController", async () => {
const c = await ConnectionFactory.fromController("localhost:9000");
Expand Down Expand Up @@ -78,6 +83,32 @@ describe("ConnectionFactory", () => {
});
await expect(ConnectionFactory.fromController("localhost:9000")).rejects.toThrowError("status code: 500");
});

it("should use a custom HttpClient when one is provided", async () => {
const httpClient = mock<HttpClient>();
httpClient.get.mockResolvedValueOnce({
status: 200,
data: { baseballStats: [{ port: 8000, host: "localhost", instanceName: "Broker_172.17.0.2_8000" }] },
});
httpClient.post.mockResolvedValueOnce({
status: 200,
data: {
resultTable: {
dataSchema: { columnNames: ["league", "hits"], columnDataTypes: ["STRING", "DOUBLE"] },
rows: [["NL", 1890198.0]],
},
exceptions: [],
},
});
const c = await ConnectionFactory.fromController("localhost:9000", {
customHttpClient: httpClient,
brokerUpdateFreqMs: 10000,
});
await c.execute("baseballStats", "query");
await c.close();
expect(httpClient.get).toHaveBeenCalledTimes(1);
expect(httpClient.post).toHaveBeenCalledTimes(1);
});
});
describe("fromHostList method", () => {
it("should throw an error if no brokers are provided", () => {
Expand Down Expand Up @@ -107,5 +138,25 @@ describe("ConnectionFactory", () => {
expect(brokerHandlerHeaders).toHaveBeenCalledTimes(1);
expect(brokerHandlerHeaders.mock.calls[0][0]["custom-header"]).toEqual("custom-value");
});
it("should use a custom HttpClient when one is provided", async () => {
const httpClient = mock<HttpClient>();
httpClient.post.mockResolvedValueOnce({
status: 200,
data: {
resultTable: {
dataSchema: { columnNames: ["league", "hits"], columnDataTypes: ["STRING", "DOUBLE"] },
rows: [["NL", 1890198.0]],
},
exceptions: [],
},
});
const c = await ConnectionFactory.fromHostList(["localhost:9000"], {
customHttpClient: httpClient,
});
await c.execute("baseballStats", "query");
await c.close();
expect(httpClient.get).toHaveBeenCalledTimes(0);
expect(httpClient.post).toHaveBeenCalledTimes(1);
});
});
});
17 changes: 13 additions & 4 deletions src/connection-factory.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import { Connection } from "./connection";
import { SimpleBrokerSelector } from "./simple-broker-selector";
import { JsonBrokerClientTransport } from "./json-broker-client-transport";
import axios from "axios";
import { JsonControllerClientTransport } from "./json-controller-client-transport";
import { ControllerBasedBrokerSelector } from "./controller-based-broker-selector";
import { SelectorUpdaterPeriodic } from "./selector-updater-periodic";
import { Logger } from "./logger.interface";
import { dummyLogger } from "./dummy-logger";
import { UndiciHttpClient } from "./undici-http-client";
import { HttpClient } from "./http-client.interface";

export type FromHostListOptions = {
/**
Expand All @@ -17,11 +18,16 @@ export type FromHostListOptions = {
* Additional HTTP headers to include in broker query API requests
*/
brokerReqHeaders: Record<string, string>;
/**
* A custom http client to execute broker and controller HTTP requests (GET and POST)
*/
customHttpClient: HttpClient;
};

const fromHostListDefaultOptions: FromHostListOptions = {
logger: dummyLogger,
brokerReqHeaders: {},
customHttpClient: null,
};

export type FromControllerOptions = FromHostListOptions & {
Expand All @@ -39,6 +45,7 @@ export type FromControllerOptions = FromHostListOptions & {
const fromControllerDefaultOptions: FromControllerOptions = {
logger: dummyLogger,
brokerReqHeaders: {},
customHttpClient: null,
controllerReqHeaders: {},
brokerUpdateFreqMs: 1000,
};
Expand All @@ -55,15 +62,16 @@ async function fromController(
options: Partial<FromControllerOptions> = {}
): Promise<Connection> {
const actualOptions = Object.assign({}, fromControllerDefaultOptions, options);
const httpClient = options.customHttpClient ? options.customHttpClient : new UndiciHttpClient();
const controllerTransport = new JsonControllerClientTransport(
controllerAddress,
axios.get,
httpClient,
actualOptions.controllerReqHeaders
);
const brokerSelector = new ControllerBasedBrokerSelector(controllerTransport, actualOptions.logger);
await brokerSelector.setup();
const updater = new SelectorUpdaterPeriodic(brokerSelector, actualOptions.brokerUpdateFreqMs, actualOptions.logger);
const brokerTransport = new JsonBrokerClientTransport(axios.post, actualOptions.brokerReqHeaders);
const brokerTransport = new JsonBrokerClientTransport(httpClient, actualOptions.brokerReqHeaders);
return new Connection(brokerSelector, brokerTransport, actualOptions.logger, updater);
}

Expand All @@ -74,8 +82,9 @@ async function fromController(
*/
function fromHostList(brokerAddresses: string[], options: Partial<FromHostListOptions> = {}): Connection {
const actualOptions = Object.assign({}, fromHostListDefaultOptions, options);
const httpClient = options.customHttpClient ? options.customHttpClient : new UndiciHttpClient();
const brokerSelector = new SimpleBrokerSelector(brokerAddresses);
const transport = new JsonBrokerClientTransport(axios.post, actualOptions.brokerReqHeaders);
const transport = new JsonBrokerClientTransport(httpClient, actualOptions.brokerReqHeaders);
return new Connection(brokerSelector, transport, actualOptions.logger);
}

Expand Down
38 changes: 38 additions & 0 deletions src/http-client.interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* HttpClient interface can be used for providing a custom client responsible
* for executing GET and POST requests against Pinot Controllers and Brokers.
*/
export interface HttpClient {
/**
* Executes a HTTP GET request at a specified url.
* Used for sending requests to Pinot Controllers.
*/
get: <T>(
url: string,
options: {
headers: Record<string, string>;
}
) => Promise<HttpClientResponse<T>>;
/**
* Executes a HTTP POST request at a specified url
* Used for querying Pinot Brokers.
*/
post: <T>(
url: string,
data: object,
options: {
headers: Record<string, string>;
}
) => Promise<HttpClientResponse<T>>;
}

export interface HttpClientResponse<T> {
/**
* Parsed data from the HTTP response body
*/
data: T;
/**
* HTTP response status code
*/
status: number;
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { Connection } from "./connection";
export { ConnectionFactory, FromHostListOptions, FromControllerOptions } from "./connection-factory";
export { HttpClient, HttpClientResponse } from "./http-client.interface";
export * from "./broker-response.types";
38 changes: 19 additions & 19 deletions src/json-broker-client-transport.spec.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { AxiosError } from "axios";
import { mock, mockClear } from "jest-mock-extended";
import { HttpClient } from "./http-client.interface";
import { JsonBrokerClientTransport } from "./json-broker-client-transport";

describe("JsonBrokerClientTransport class", () => {
const mockHttpPostFn = jest.fn();
const mockHttpClient = mock<HttpClient>();
beforeEach(() => {
mockHttpPostFn.mockClear();
mockClear(mockHttpClient);
});
it("should call the HttpPostFn", async () => {
mockHttpPostFn.mockResolvedValueOnce({ data: {} });
const transport = new JsonBrokerClientTransport(mockHttpPostFn, {});
it("should call the HttpClient post method", async () => {
mockHttpClient.post.mockResolvedValueOnce({ data: {}, status: 200 });
const transport = new JsonBrokerClientTransport(mockHttpClient, {});
await transport.executeQuery("addr:8000", "query");
expect(mockHttpPostFn).toHaveBeenCalledTimes(1);
expect(mockHttpPostFn).toHaveBeenCalledWith(
expect(mockHttpClient.post).toHaveBeenCalledTimes(1);
expect(mockHttpClient.post).toHaveBeenCalledWith(
"http://addr:8000/query/sql",
{ sql: "query" },
{
Expand All @@ -22,11 +23,11 @@ describe("JsonBrokerClientTransport class", () => {
);
});
it("should add custom request headers", async () => {
mockHttpPostFn.mockResolvedValueOnce({ data: {} });
const transport = new JsonBrokerClientTransport(mockHttpPostFn, { foo: "bar", boo: "baz" });
mockHttpClient.post.mockResolvedValueOnce({ data: {}, status: 200 });
const transport = new JsonBrokerClientTransport(mockHttpClient, { foo: "bar", boo: "baz" });
await transport.executeQuery("addr:8000", "query");
expect(mockHttpPostFn).toHaveBeenCalledTimes(1);
expect(mockHttpPostFn).toHaveBeenCalledWith(
expect(mockHttpClient.post).toHaveBeenCalledTimes(1);
expect(mockHttpClient.post).toHaveBeenCalledWith(
"http://addr:8000/query/sql",
{ sql: "query" },
{
Expand All @@ -39,16 +40,15 @@ describe("JsonBrokerClientTransport class", () => {
);
});
it("should throw an error with status code on HTTP error", async () => {
const errWithResponse = new AxiosError("message");
errWithResponse.response = { status: 500, data: {}, headers: {}, statusText: "", config: {} };
mockHttpPostFn.mockRejectedValueOnce(errWithResponse);
const transport = new JsonBrokerClientTransport(mockHttpPostFn, {});
const response = { status: 500, data: { error: "description" } };
mockHttpClient.post.mockResolvedValueOnce(response);
const transport = new JsonBrokerClientTransport(mockHttpClient, {});
await expect(transport.executeQuery("addr:8000", "query")).rejects.toThrowError("500");
});
it("should throw an error with message on other errors", async () => {
const errWithMessage = new AxiosError("sample message");
mockHttpPostFn.mockRejectedValueOnce(errWithMessage);
const transport = new JsonBrokerClientTransport(mockHttpPostFn, {});
const errWithMessage = new Error("sample message");
mockHttpClient.post.mockRejectedValueOnce(errWithMessage);
const transport = new JsonBrokerClientTransport(mockHttpClient, {});
await expect(transport.executeQuery("addr:8000", "query")).rejects.toThrowError("sample message");
});
});
25 changes: 7 additions & 18 deletions src/json-broker-client-transport.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,27 @@
import { BrokerClientTransport } from "./broker-client-transport.interface";
import { BrokerResponse } from "./broker-response.types";
import { HttpClient } from "./http-client.interface";
import { PinotClientError } from "./pinot-client-error";
import { withProtocol } from "./url";

export type HttpPostFn<T> = (
url: string,
data: object,
options: {
headers: Record<string, string>;
}
) => Promise<{ data: T; status: number }>;

export class JsonBrokerClientTransport implements BrokerClientTransport {
constructor(
private readonly httpPost: HttpPostFn<BrokerResponse>,
private readonly reqHeaders: Record<string, string>
) {}
constructor(private readonly client: HttpClient, private readonly reqHeaders: Record<string, string>) {}
public async executeQuery(brokerAddress: string, query: string) {
try {
const url = withProtocol(brokerAddress) + "/query/sql";
const body = { sql: query };
const { data } = await this.httpPost(url, body, {
const { data, status } = await this.client.post<BrokerResponse>(url, body, {
headers: {
...this.reqHeaders,
"Content-Type": "application/json; charset=utf-8",
},
});
if (status !== 200) {
throw new PinotClientError("Broker responded with HTTP status code: " + status);
}
return data;
} catch (e) {
if (e.response) {
throw new PinotClientError("Broker responded with HTTP status code: " + e.response.status);
} else {
throw new PinotClientError("An error occurred when sending request to the broker: " + e.message);
}
throw new PinotClientError("An error occurred when sending request to the broker: " + e.message);
}
}
}
Loading