Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support insertImmediately #13

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "greptime",
"version": "0.2.0",
"version": "0.2.1",
"description": "SDK for greptimeDB",
"main": "dist/index.js",
"jsnext:main": "dist/index.esm.js",
Expand Down
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const Greptime = ({
username = '',
password = '',
sqlConfig = {
insertImmediately: false,
insertQueueConfig: {
maxQueueSize: 100,
maxQueueTime: 1000,
Expand Down
2 changes: 2 additions & 0 deletions src/sql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class Sql extends SqlOperation {
url: string
sql: SqlState
insertQueueConfig: InsertQueueConfigState
insertImmediately: boolean
insertValues: Map<string, SqlInsertValuesState>
timeoutId: Map<string, ReturnType<typeof setTimeout>>

Expand All @@ -17,6 +18,7 @@ class Sql extends SqlOperation {
this.url = `/v1/sql?db=${dbname}`
this.sql = {} as SqlState
this.insertQueueConfig = sqlConfig.insertQueueConfig
this.insertImmediately = sqlConfig.insertImmediately
this.insertValues = new Map()
this.timeoutId = new Map()
}
Expand Down
39 changes: 24 additions & 15 deletions src/sql/operation.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { InsertQueueConfigState } from './../type/sql'
const dayjs = require('dayjs')
import { formatResult, getInsertTime } from '../utils'
import { FormatResultState, OutputState, QueryResData, RecordsState } from '../type/common'
Expand Down Expand Up @@ -99,7 +100,7 @@ class SqlOperation {
// Write
createTable = async function (
name: string,
{ timeIndex, tags, fields }: CreateTableQueryState
{ timeIndex = 'ts', tags, fields }: CreateTableQueryState
): Promise<OutputState> {
const sql = `CREATE TABLE IF NOT EXISTS ${name} (
${timeIndex} TIMESTAMP TIME INDEX,
Expand All @@ -122,29 +123,37 @@ class SqlOperation {

return <OutputState>formatResult(res, '')
}
_insert = async function (table: string): Promise<OutputState> {
const valuesStr = `${this.insertValues
.get(table)
.map((value) => `(${value.map((item) => (typeof item === 'string' ? `"${item}"` : item)).join(',')})`)
.join(',\n')};`
const sql = `INSERT INTO ${table} VALUES ${valuesStr}`
let res = await this.runSQL(sql)
this.insertValues.set(table, [])

return <OutputState>formatResult(res, '')
}
insert = async function (table: string, values: SqlInsertValuesState) {
let res: string
const isDArray = Array.isArray(values[0])
if (this.insertValues.get(table) && this.insertValues.get(table).length < 100)
clearTimeout(this.timeoutId.get(table))
values = isDArray ? values : [values as Array<number | string>]
values = Array.isArray(values[0]) ? values : [values as Array<number | string>]
this.insertValues.set(table, this.insertValues.get(table) ? this.insertValues.get(table).concat(values) : values)
if (this.insertImmediately) {
const res = await this._insert(table)
return res
}

if (this.insertValues.get(table) && this.insertValues.get(table).length < this.insertQueueConfig.maxQueueTime) {
clearTimeout(this.timeoutId.get(table))
}

this.timeoutId.set(
table,
setTimeout(async () => {
const valuesStr = `${this.insertValues
.get(table)
.map((value) => {
return `(${value.map((item) => (typeof item === 'string' ? `"${item}"` : item)).join(',')})`
})
.join(',\n')};`
const sql = `INSERT INTO ${table} VALUES ${valuesStr}`
await this.runSQL(sql)
this.insertValues.set(table, [])
}, 1000)
await this._insert(table)
}, this.insertQueueConfig.maxQueueTime)
)

res = `The insert queue has ${
this.insertValues && this.insertValues.get(table).length
} statements. If no new data is available, the request will be sent at ${getInsertTime(
Expand Down
1 change: 1 addition & 0 deletions src/type/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export interface PrimaryState {
}

export interface SqlConfigState {
insertImmediately: boolean
insertQueueConfig: InsertQueueConfigState
}

Expand Down