diff --git a/package.json b/package.json index 2776a81..1e9661d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "greptime", - "version": "0.2.0", + "version": "0.2.1", "description": "SDK for greptimeDB", "main": "dist/index.js", "jsnext:main": "dist/index.esm.js", diff --git a/src/index.ts b/src/index.ts index b7264e7..3bfa765 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,7 @@ const Greptime = ({ username = '', password = '', sqlConfig = { + insertImmediately: false, insertQueueConfig: { maxQueueSize: 100, maxQueueTime: 1000, diff --git a/src/sql/index.ts b/src/sql/index.ts index 8fb601e..444ef7f 100644 --- a/src/sql/index.ts +++ b/src/sql/index.ts @@ -9,6 +9,7 @@ class Sql extends SqlOperation { url: string sql: SqlState insertQueueConfig: InsertQueueConfigState + insertImmediately: boolean insertValues: Map timeoutId: Map> @@ -17,6 +18,7 @@ class Sql extends SqlOperation { this.url = `/v1/sql?db=${dbname}` this.sql = {} as SqlState this.insertQueueConfig = sqlConfig.insertQueueConfig + this.insertImmediately = sqlConfig.insertImmediately this.insertValues = new Map() this.timeoutId = new Map() } diff --git a/src/sql/operation.ts b/src/sql/operation.ts index 4fb0f4f..653010f 100644 --- a/src/sql/operation.ts +++ b/src/sql/operation.ts @@ -1,3 +1,4 @@ +import { InsertQueueConfigState } from './../type/sql' const dayjs = require('dayjs') import { formatResult, getInsertTime } from '../utils' import { FormatResultState, OutputState, QueryResData, RecordsState } from '../type/common' @@ -99,7 +100,7 @@ class SqlOperation { // Write createTable = async function ( name: string, - { timeIndex, tags, fields }: CreateTableQueryState + { timeIndex = 'ts', tags, fields }: CreateTableQueryState ): Promise { const sql = `CREATE TABLE IF NOT EXISTS ${name} ( ${timeIndex} TIMESTAMP TIME INDEX, @@ -122,29 +123,37 @@ class SqlOperation { return formatResult(res, '') } + _insert = async function (table: string): Promise { + const valuesStr = `${this.insertValues + .get(table) + .map((value) => `(${value.map((item) => (typeof item === 'string' ? `"${item}"` : item)).join(',')})`) + .join(',\n')};` + const sql = `INSERT INTO ${table} VALUES ${valuesStr}` + let res = await this.runSQL(sql) + this.insertValues.set(table, []) + return formatResult(res, '') + } insert = async function (table: string, values: SqlInsertValuesState) { let res: string - const isDArray = Array.isArray(values[0]) - if (this.insertValues.get(table) && this.insertValues.get(table).length < 100) - clearTimeout(this.timeoutId.get(table)) - values = isDArray ? values : [values as Array] + values = Array.isArray(values[0]) ? values : [values as Array] this.insertValues.set(table, this.insertValues.get(table) ? this.insertValues.get(table).concat(values) : values) + if (this.insertImmediately) { + const res = await this._insert(table) + return res + } + + if (this.insertValues.get(table) && this.insertValues.get(table).length < this.insertQueueConfig.maxQueueTime) { + clearTimeout(this.timeoutId.get(table)) + } this.timeoutId.set( table, setTimeout(async () => { - const valuesStr = `${this.insertValues - .get(table) - .map((value) => { - return `(${value.map((item) => (typeof item === 'string' ? `"${item}"` : item)).join(',')})` - }) - .join(',\n')};` - const sql = `INSERT INTO ${table} VALUES ${valuesStr}` - await this.runSQL(sql) - this.insertValues.set(table, []) - }, 1000) + await this._insert(table) + }, this.insertQueueConfig.maxQueueTime) ) + res = `The insert queue has ${ this.insertValues && this.insertValues.get(table).length } statements. If no new data is available, the request will be sent at ${getInsertTime( diff --git a/src/type/sql.ts b/src/type/sql.ts index 53d98d8..351f782 100644 --- a/src/type/sql.ts +++ b/src/type/sql.ts @@ -25,6 +25,7 @@ export interface PrimaryState { } export interface SqlConfigState { + insertImmediately: boolean insertQueueConfig: InsertQueueConfigState }