diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e582a713282d..2ae916e8df35 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822] - Add citadel metricset for Istio Metricbeat module {pull}15990[15990] - Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923] +- Reuse connections in SQL module. {pull}16001[16001] *Packetbeat* diff --git a/x-pack/metricbeat/module/sql/query/query.go b/x-pack/metricbeat/module/sql/query/query.go index 7a0c66de9da2..5ff543aa681d 100644 --- a/x-pack/metricbeat/module/sql/query/query.go +++ b/x-pack/metricbeat/module/sql/query/query.go @@ -5,6 +5,7 @@ package query import ( + "context" "fmt" "strconv" "strings" @@ -43,6 +44,8 @@ type MetricSet struct { Driver string Query string ResponseFormat string + + db *sqlx.DB } // New creates a new instance of the MetricSet. New is responsible for unpacking @@ -77,19 +80,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // of an error set the Error field of mb.Event or simply call report.Error(). // It calls m.fetchTableMode() or m.fetchVariableMode() depending on the response // format of the query. -func (m *MetricSet) Fetch(report mb.ReporterV2) error { - db, err := sqlx.Open(m.Driver, m.HostData().URI) +func (m *MetricSet) Fetch(ctx context.Context, report mb.ReporterV2) error { + db, err := m.DB() if err != nil { return errors.Wrap(err, "error opening connection") } - defer db.Close() - - err = db.Ping() - if err != nil { - return errors.Wrap(err, "error testing connection") - } - rows, err := db.Queryx(m.Query) + rows, err := db.QueryxContext(ctx, m.Query) if err != nil { return errors.Wrap(err, "error executing query") } @@ -102,6 +99,23 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { return m.fetchVariableMode(rows, report) } +// DB gets a client ready to query the database +func (m *MetricSet) DB() (*sqlx.DB, error) { + if m.db == nil { + db, err := sqlx.Open(m.Driver, m.HostData().URI) + if err != nil { + return nil, errors.Wrap(err, "opening connection") + } + err = db.Ping() + if err != nil { + return nil, errors.Wrap(err, "testing connection") + } + + m.db = db + } + return m.db, nil +} + // fetchTableMode scan the rows and publishes the event for querys that return the response in a table format. func (m *MetricSet) fetchTableMode(rows *sqlx.Rows, report mb.ReporterV2) error { @@ -229,3 +243,11 @@ func getValue(pval *interface{}) string { return fmt.Sprint(v) } } + +// Close closes the connection pool releasing its resources +func (m *MetricSet) Close() error { + if m.db == nil { + return nil + } + return errors.Wrap(m.db.Close(), "closing connection") +}