From 63223d45d4ab5b0e1ac0b0628496139f46056eca Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 31 Jan 2020 17:05:11 +0100 Subject: [PATCH 1/2] Reuse connections on sql module --- CHANGELOG.next.asciidoc | 1 + x-pack/metricbeat/module/sql/query/query.go | 40 ++++++++++++++++----- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 49950c1c3d63..1c3352c79262 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -127,6 +127,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add STAN dashboard {pull}15654[15654] - Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822] - Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923] +- Reuse connections in SQL module. {pull}[] *Packetbeat* diff --git a/x-pack/metricbeat/module/sql/query/query.go b/x-pack/metricbeat/module/sql/query/query.go index 7a0c66de9da2..5ff543aa681d 100644 --- a/x-pack/metricbeat/module/sql/query/query.go +++ b/x-pack/metricbeat/module/sql/query/query.go @@ -5,6 +5,7 @@ package query import ( + "context" "fmt" "strconv" "strings" @@ -43,6 +44,8 @@ type MetricSet struct { Driver string Query string ResponseFormat string + + db *sqlx.DB } // New creates a new instance of the MetricSet. New is responsible for unpacking @@ -77,19 +80,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // of an error set the Error field of mb.Event or simply call report.Error(). // It calls m.fetchTableMode() or m.fetchVariableMode() depending on the response // format of the query. -func (m *MetricSet) Fetch(report mb.ReporterV2) error { - db, err := sqlx.Open(m.Driver, m.HostData().URI) +func (m *MetricSet) Fetch(ctx context.Context, report mb.ReporterV2) error { + db, err := m.DB() if err != nil { return errors.Wrap(err, "error opening connection") } - defer db.Close() - - err = db.Ping() - if err != nil { - return errors.Wrap(err, "error testing connection") - } - rows, err := db.Queryx(m.Query) + rows, err := db.QueryxContext(ctx, m.Query) if err != nil { return errors.Wrap(err, "error executing query") } @@ -102,6 +99,23 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { return m.fetchVariableMode(rows, report) } +// DB gets a client ready to query the database +func (m *MetricSet) DB() (*sqlx.DB, error) { + if m.db == nil { + db, err := sqlx.Open(m.Driver, m.HostData().URI) + if err != nil { + return nil, errors.Wrap(err, "opening connection") + } + err = db.Ping() + if err != nil { + return nil, errors.Wrap(err, "testing connection") + } + + m.db = db + } + return m.db, nil +} + // fetchTableMode scan the rows and publishes the event for querys that return the response in a table format. func (m *MetricSet) fetchTableMode(rows *sqlx.Rows, report mb.ReporterV2) error { @@ -229,3 +243,11 @@ func getValue(pval *interface{}) string { return fmt.Sprint(v) } } + +// Close closes the connection pool releasing its resources +func (m *MetricSet) Close() error { + if m.db == nil { + return nil + } + return errors.Wrap(m.db.Close(), "closing connection") +} From d2e901ff193b2f6c6b673174e7e178b6ffbc1574 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Mon, 3 Feb 2020 11:48:15 +0100 Subject: [PATCH 2/2] Fix changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1c3352c79262..83006dc9c239 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -127,7 +127,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add STAN dashboard {pull}15654[15654] - Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822] - Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923] -- Reuse connections in SQL module. {pull}[] +- Reuse connections in SQL module. {pull}16001[16001] *Packetbeat*