From 6d875d707f27304ca7021090c33ff05a0e52cd92 Mon Sep 17 00:00:00 2001 From: Brandon Pfeifer Date: Tue, 17 May 2022 14:51:51 -0400 Subject: [PATCH 1/3] chore: backport 8334dd0a23 to 1.x --- .../influxdb/min_max_influxdb_test.flux | 67 +++++++------------ 1 file changed, 25 insertions(+), 42 deletions(-) diff --git a/flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux b/flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux index 820e81e5a13..50bb419a7a9 100644 --- a/flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux +++ b/flux/stdlib/influxdata/influxdb/min_max_influxdb_test.flux @@ -1,61 +1,44 @@ package influxdb_test -import "testing/expect" -option now = () => (2030-01-01T00:00:00Z) +import "testing/expect" -testcase push_down_min_bare extends "flux/planner/group_min_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_min_test.group_min_bare() +testcase push_down_min_bare extends "flux/planner/group_min_test.group_min_bare" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } -testcase push_down_min_bare_host extends "flux/planner/group_min_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_min_test.group_min_bare_host() +testcase push_down_min_bare_host extends "flux/planner/group_min_test.group_min_bare_host" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } -testcase push_down_min_bare_field extends "flux/planner/group_min_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_min_test.group_min_bare_field() +testcase push_down_min_bare_field extends "flux/planner/group_min_test.group_min_bare_field" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } -testcase push_down_max_bare extends "flux/planner/group_max_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_max_test.group_max_bare() +testcase push_down_max_bare extends "flux/planner/group_max_test.group_max_bare" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } -testcase push_down_max_bare_host extends "flux/planner/group_max_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_max_test.group_max_bare_host() +testcase push_down_max_bare_host extends "flux/planner/group_max_test.group_max_bare_host" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } -testcase push_down_max_bare_field extends "flux/planner/group_max_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_max_test.group_max_bare_field() +testcase push_down_max_bare_field extends "flux/planner/group_max_test.group_max_bare_field" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } -testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_min_max_table_test.group_min_table() +testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test.group_min_table" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } -testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test" { - expect.planner(rules: [ - "PushDownGroupAggregateRule": 1, - ]) - group_min_max_table_test.group_max_table() +testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test.group_max_table" { + expect.planner(rules: ["PushDownGroupAggregateRule": 1]) + super() } From 4432650283e2a7c5ec337755fd9bf9fd9ed03745 Mon Sep 17 00:00:00 2001 From: Brandon Pfeifer Date: Tue, 17 May 2022 14:53:52 -0400 Subject: [PATCH 2/3] chore: upgrade flux to v0.167.0 --- go.mod | 6 +----- go.sum | 20 +++----------------- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/go.mod b/go.mod index b11c8eafa06..ce87948dd2f 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/golang/mock v1.5.0 github.com/golang/snappy v0.0.4 github.com/google/go-cmp v0.5.7 - github.com/influxdata/flux v0.159.0 + github.com/influxdata/flux v0.167.0 github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256 github.com/influxdata/pkg-config v0.2.11 @@ -89,7 +89,6 @@ require ( github.com/benbjohnson/immutable v0.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect - github.com/c-bata/go-prompt v0.2.2 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/deepmap/oapi-codegen v1.6.0 // indirect github.com/denisenkom/go-mssqldb v0.10.0 // indirect @@ -123,17 +122,14 @@ require ( github.com/klauspost/compress v1.13.6 // indirect github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6 // indirect github.com/lib/pq v1.0.0 // indirect - github.com/mattn/go-colorable v0.1.8 // indirect github.com/mattn/go-ieproxy v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.3 // indirect - github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect github.com/philhofer/fwd v1.0.0 // indirect github.com/pierrec/lz4/v4 v4.1.11 // indirect github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect - github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/procfs v0.0.11 // indirect github.com/segmentio/kafka-go v0.2.0 // indirect diff --git a/go.sum b/go.sum index ddf1e9b02e4..ffca5a3077a 100644 --- a/go.sum +++ b/go.sum @@ -96,7 +96,6 @@ github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1M github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/HdrHistogram/hdrhistogram-go v1.1.0 h1:6dpdDPTRoo78HxAJ6T1HfMiKSnqhgRRqzCuPshRkQ7I= -github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Masterminds/semver v1.4.2 h1:WBLTQ37jOCzSLtXNdoo8bNM8876KhNqOKvrlGITgsTc= @@ -235,7 +234,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7 github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -285,7 +283,6 @@ github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoD github.com/form3tech-oss/jwt-go v3.2.5+incompatible h1:/l4kBbb4/vGSsdtB5nUe8L7B9mImVMaBPw9L/0TBHU8= github.com/form3tech-oss/jwt-go v3.2.5+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15 h1:nLPjjvpUAODOR6vY/7o0hBIk8iTr19Fvmf8aFx/kC7A= -github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15/go.mod h1:tPg4cp4nseejPd+UKxtCVQ2hUxNTZ7qQZJa7CLriIeo= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -582,8 +579,8 @@ github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/flux v0.65.0/go.mod h1:BwN2XG2lMszOoquQaFdPET8FRQfrXiZsWmcMO9rkaVY= -github.com/influxdata/flux v0.159.0 h1:Vdq/3/NfO6xl2q6COr8c3rKuywMIRrfxVIRrXGHZq/Q= -github.com/influxdata/flux v0.159.0/go.mod h1:dALQQHRj+70b+o/9RtaHAAXH3toMs2M58gfY66oEll8= +github.com/influxdata/flux v0.167.0 h1:U8xyjJz6uL1mfmKEio0DYxwUMglbk3jJdFiA0v5nsAc= +github.com/influxdata/flux v0.167.0/go.mod h1:eNApXyjdyUdCNs6LxUQRBHxjUVqK1XrJrlMPhIQSQpA= github.com/influxdata/gosnowflake v1.6.9 h1:BhE39Mmh8bC+Rvd4QQsP2gHypfeYIH1wqW1AjGWxxrE= github.com/influxdata/gosnowflake v1.6.9/go.mod h1:9W/BvCXOKx2gJtQ+jdi1Vudev9t9/UDOEHnlJZ/y1nU= github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU= @@ -662,9 +659,8 @@ github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k= github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= @@ -710,7 +706,6 @@ github.com/mattn/go-tty v0.0.0-20180907095812-13ff1204f104/go.mod h1:XPvLUNfbS4f github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/miekg/dns v1.1.22/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.29 h1:xHBEhR+t5RzcFJjBLJlax2daXOrTYtr9z4WdKEfWFzg= github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= @@ -747,7 +742,6 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= @@ -940,7 +934,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zeebo/xxh3 v0.13.0/go.mod h1:AQY73TOrhF3jNsdiM9zZOb8MThrYbZONHj7ryDBaLpg= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= @@ -1008,7 +1001,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 h1:/pEO3GD/ABYAjuakUS6xSEmmlyVS4kxBNkeA9tLJiTI= golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -1028,7 +1020,6 @@ golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EH golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= golang.org/x/exp v0.0.0-20211028214138-64b4c8e87d1a/go.mod h1:a3o/VtDNHN+dCVLEpzjjUHOzR+Ln3DHX056ZPzoZGGA= golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 h1:/5Bs7sWi0i3rOVO5KnM55OwugpsD4bRW1zywKoZjbkI= -golang.org/x/exp v0.0.0-20211216164055-b2b84827b756/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1065,7 +1056,6 @@ golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1-0.20210830214625-1b1db11ec8f4/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 h1:LQmS1nU0twXLA96Kt7U9qtHJEbBk3z6Q0V4UXjZkpr4= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1123,7 +1113,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210505024714-0287a6fb4125/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= @@ -1235,7 +1224,6 @@ golang.org/x/sys v0.0.0-20210601080250-7ecdf8ef093b/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= @@ -1339,7 +1327,6 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1493,7 +1480,6 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= From cf2ee383d3f9d3b18b7f228f64bee03584ccf58f Mon Sep 17 00:00:00 2001 From: Brandon Pfeifer Date: Fri, 1 Apr 2022 14:19:34 -0400 Subject: [PATCH 3/3] chore: update flux to latest version (#23249) * chore: update flux to latest version * fix: backport "convert allocator to interface" * fix: construct `span` during dependency injection --- flux/stdlib/influxdata/influxdb/buckets.go | 2 +- flux/stdlib/influxdata/influxdb/source.go | 2 +- flux/stdlib/influxdata/influxdb/storage.go | 10 +- flux/stdlib/influxdata/influxdb/to_test.go | 4 +- .../influxdata/influxdb/v1/databases.go | 2 +- internal/flux_controller.go | 2 +- mock/flux.go | 20 ++-- query/control/controller.go | 31 +++--- query/control/controller_test.go | 42 ++++---- query/control/memory.go | 2 +- storage/flux/reader.go | 20 ++-- storage/flux/table.gen.go | 60 ++++++------ storage/flux/table.gen.go.tmpl | 12 +-- storage/flux/table.go | 4 +- storage/flux/table_test.go | 98 +++++++------------ test-flux.sh | 2 +- 16 files changed, 149 insertions(+), 164 deletions(-) diff --git a/flux/stdlib/influxdata/influxdb/buckets.go b/flux/stdlib/influxdata/influxdb/buckets.go index 5bf824b54bd..7acafd608d8 100644 --- a/flux/stdlib/influxdata/influxdb/buckets.go +++ b/flux/stdlib/influxdata/influxdb/buckets.go @@ -37,7 +37,7 @@ func (s *LocalBucketsProcedureSpec) Copy() plan.ProcedureSpec { type BucketsDecoder struct { deps StorageDependencies - alloc *memory.Allocator + alloc memory.Allocator user meta.User } diff --git a/flux/stdlib/influxdata/influxdb/source.go b/flux/stdlib/influxdata/influxdb/source.go index cfa6d4a913a..201b564230b 100644 --- a/flux/stdlib/influxdata/influxdb/source.go +++ b/flux/stdlib/influxdata/influxdb/source.go @@ -29,7 +29,7 @@ type Source struct { id execute.DatasetID ts []execute.Transformation - alloc *memory.Allocator + alloc memory.Allocator stats cursors.CursorStats label string diff --git a/flux/stdlib/influxdata/influxdb/storage.go b/flux/stdlib/influxdata/influxdb/storage.go index ef03e98bc39..36cd881be20 100644 --- a/flux/stdlib/influxdata/influxdb/storage.go +++ b/flux/stdlib/influxdata/influxdb/storage.go @@ -108,12 +108,12 @@ func (spec *ReadWindowAggregateSpec) Name() string { } type Reader interface { - ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error) - ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc *memory.Allocator) (TableIterator, error) - ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error) + ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc memory.Allocator) (TableIterator, error) + ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc memory.Allocator) (TableIterator, error) + ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc memory.Allocator) (TableIterator, error) - ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc *memory.Allocator) (TableIterator, error) - ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc *memory.Allocator) (TableIterator, error) + ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc memory.Allocator) (TableIterator, error) + ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc memory.Allocator) (TableIterator, error) Close() } diff --git a/flux/stdlib/influxdata/influxdb/to_test.go b/flux/stdlib/influxdata/influxdb/to_test.go index 4d9207e5530..bb87ba22d27 100644 --- a/flux/stdlib/influxdata/influxdb/to_test.go +++ b/flux/stdlib/influxdata/influxdb/to_test.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/dependencies/dependenciestest" + "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/interpreter" @@ -760,7 +761,8 @@ m,tag1=c _value=4 41`), tc.want.tables, nil, func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation { - ctx := deps.Inject(context.Background()) + ctx, span := dependency.Inject(context.Background()) + defer span.Finish() newT, err := influxdb.NewToTransformation(ctx, d, c, tc.spec, deps.StorageDeps) if err != nil { t.Error(err) diff --git a/flux/stdlib/influxdata/influxdb/v1/databases.go b/flux/stdlib/influxdata/influxdb/v1/databases.go index 5d2c6225183..d82827a4875 100644 --- a/flux/stdlib/influxdata/influxdb/v1/databases.go +++ b/flux/stdlib/influxdata/influxdb/v1/databases.go @@ -71,7 +71,7 @@ type DatabasesDecoder struct { deps *influxdb.StorageDependencies databases []meta.DatabaseInfo user meta.User - alloc *memory.Allocator + alloc memory.Allocator } func (bd *DatabasesDecoder) Connect(ctx context.Context) error { diff --git a/internal/flux_controller.go b/internal/flux_controller.go index ba86dfb23b0..e03a7991a7d 100644 --- a/internal/flux_controller.go +++ b/internal/flux_controller.go @@ -20,7 +20,7 @@ func NewFluxControllerMock() *FluxControllerMock { if err != nil { return nil, err } - alloc := &memory.Allocator{} + alloc := &memory.ResourceAllocator{} return p.Start(ctx, alloc) }, } diff --git a/mock/flux.go b/mock/flux.go index fa17abeff1b..2f1ab3f8939 100644 --- a/mock/flux.go +++ b/mock/flux.go @@ -10,31 +10,31 @@ import ( // Reader is a mock implementation of flux/stdlib/influxdata/influxdb.Reader type Reader struct { - ReadFilterFn func(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) - ReadGroupFn func(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) - ReadTagKeysFn func(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) - ReadTagValuesFn func(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) - ReadWindowAggregateFn func(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) + ReadFilterFn func(ctx context.Context, spec influxdb.ReadFilterSpec, alloc memory.Allocator) (influxdb.TableIterator, error) + ReadGroupFn func(ctx context.Context, spec influxdb.ReadGroupSpec, alloc memory.Allocator) (influxdb.TableIterator, error) + ReadTagKeysFn func(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc memory.Allocator) (influxdb.TableIterator, error) + ReadTagValuesFn func(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc memory.Allocator) (influxdb.TableIterator, error) + ReadWindowAggregateFn func(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc memory.Allocator) (influxdb.TableIterator, error) CloseFn func() } -func (m Reader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { +func (m Reader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc memory.Allocator) (influxdb.TableIterator, error) { return m.ReadFilterFn(ctx, spec, alloc) } -func (m Reader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { +func (m Reader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc memory.Allocator) (influxdb.TableIterator, error) { return m.ReadGroupFn(ctx, spec, alloc) } -func (m Reader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { +func (m Reader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc memory.Allocator) (influxdb.TableIterator, error) { return m.ReadWindowAggregateFn(ctx, spec, alloc) } -func (m Reader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { +func (m Reader) ReadTagKeys(ctx context.Context, spec influxdb.ReadTagKeysSpec, alloc memory.Allocator) (influxdb.TableIterator, error) { return m.ReadTagKeysFn(ctx, spec, alloc) } -func (m Reader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { +func (m Reader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagValuesSpec, alloc memory.Allocator) (influxdb.TableIterator, error) { return m.ReadTagValuesFn(ctx, spec, alloc) } diff --git a/query/control/controller.go b/query/control/controller.go index 97baf5cc870..7988d6496b4 100644 --- a/query/control/controller.go +++ b/query/control/controller.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/codes" + "github.com/influxdata/flux/dependency" "github.com/influxdata/flux/execute/table" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/memory" @@ -230,11 +231,10 @@ func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Qu defer span.Finish() // The controller injects the dependencies for each incoming request. - for _, dep := range c.dependencies { - ctx = dep.Inject(ctx) - } - q, err := c.query(ctx, compiler) + ctx, deps := dependency.Inject(ctx, c.dependencies...) + q, err := c.query(ctx, compiler, deps) if err != nil { + deps.Finish() return q, err } @@ -243,8 +243,8 @@ func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Qu // query submits a query for execution returning immediately. // Done must be called on any returned Query objects. -func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) { - q, err := c.createQuery(ctx, compiler.CompilerType()) +func (c *Controller) query(ctx context.Context, compiler flux.Compiler, deps *dependency.Span) (flux.Query, error) { + q, err := c.createQuery(ctx, compiler, deps) if err != nil { return nil, handleFluxError(err) } @@ -264,7 +264,7 @@ func (c *Controller) query(ctx context.Context, compiler flux.Compiler) (flux.Qu return q, nil } -func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Query, error) { +func (c *Controller) createQuery(ctx context.Context, compiler flux.Compiler, deps *dependency.Span) (*Query, error) { c.queriesMu.RLock() if c.shutdown { c.queriesMu.RUnlock() @@ -287,7 +287,7 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu labelValues[i] = str compileLabelValues[i] = str } - compileLabelValues[len(compileLabelValues)-1] = string(ct) + compileLabelValues[len(compileLabelValues)-1] = string(compiler.CompilerType()) cctx, cancel := context.WithCancel(ctx) parentSpan, parentCtx := tracing.StartSpanFromContextWithPromMetrics( @@ -307,6 +307,8 @@ func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) (*Qu parentSpan: parentSpan, cancel: cancel, doneCh: make(chan struct{}), + deps: deps, + compiler: compiler, } // Lock the queries mutex for the rest of this method. @@ -593,12 +595,14 @@ type Query struct { done sync.Once doneCh chan struct{} - program flux.Program - exec flux.Query - results chan flux.Result + program flux.Program + exec flux.Query + results chan flux.Result + compiler flux.Compiler memoryManager *queryMemoryManager - alloc *memory.Allocator + alloc *memory.ResourceAllocator + deps *dependency.Span } func (q *Query) ProfilerResults() (flux.ResultIterator, error) { @@ -694,6 +698,9 @@ func (q *Query) Done() { } q.stats.RuntimeErrors = errMsgs + // Clean up the dependencies. + q.deps.Finish() + // Mark the query as finished so it is removed from the query map. q.c.finish(q) diff --git a/query/control/controller_test.go b/query/control/controller_test.go index 5b38588aa96..6b8050157ed 100644 --- a/query/control/controller_test.go +++ b/query/control/controller_test.go @@ -37,7 +37,7 @@ var ( mockCompiler = &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { q.ResultsCh <- &executetest.Result{} }, }, nil @@ -197,7 +197,7 @@ func TestController_QueryRuntimeError(t *testing.T) { // ensure we have non-zero compile time time.Sleep(1 * time.Millisecond) return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { q.SetErr(errors.New("runtime error")) }, }, nil @@ -256,7 +256,7 @@ func TestController_QueryQueueError(t *testing.T) { q, err := ctrl.Query(context.Background(), &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Block until test is finished <-done }, @@ -405,7 +405,7 @@ func TestController_ExecuteError(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - StartFn: func(ctx context.Context, alloc *memory.Allocator) (*mock.Query, error) { + StartFn: func(ctx context.Context, alloc memory.Allocator) (*mock.Query, error) { return nil, errors.New("expected error") }, }, nil @@ -550,7 +550,7 @@ func TestController_StartPanic(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - StartFn: func(ctx context.Context, alloc *memory.Allocator) (i *mock.Query, e error) { + StartFn: func(ctx context.Context, alloc memory.Allocator) (i *mock.Query, e error) { panic("panic during start step") }, }, nil @@ -589,7 +589,7 @@ func TestController_ShutdownWithRunningQuery(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { close(executing) <-ctx.Done() @@ -644,7 +644,7 @@ func TestController_ShutdownWithTimeout(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // This should just block until the end of the test // when we perform cleanup. close(executing) @@ -694,7 +694,7 @@ func TestController_PerQueryMemoryLimit(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { defer func() { if err, ok := recover().(error); ok && err != nil { q.SetErr(err) @@ -746,7 +746,7 @@ func TestController_ConcurrencyQuota(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { select { case <-q.Canceled: default: @@ -817,7 +817,7 @@ func TestController_QueueSize(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { executing <- struct{}{} // Block until test is finished <-done @@ -877,7 +877,7 @@ func TestController_CancelDone_Unlimited(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Ensure the query takes a little bit of time so the cancel actually cancels something. t := time.NewTimer(time.Second) defer t.Stop() @@ -922,7 +922,7 @@ func TestController_DoneWithoutRead_Unlimited(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Ensure the query takes a little bit of time so the cancel actually cancels something. t := time.NewTimer(time.Second) defer t.Stop() @@ -974,7 +974,7 @@ func TestController_CancelDone(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Ensure the query takes a little bit of time so the cancel actually cancels something. t := time.NewTimer(time.Second) defer t.Stop() @@ -1021,7 +1021,7 @@ func TestController_DoneWithoutRead(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Ensure the query takes a little bit of time so the cancel actually cancels something. t := time.NewTimer(time.Second) defer t.Stop() @@ -1077,7 +1077,7 @@ func TestController_Error_MaxMemory(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Allocate memory continuously to hit the memory limit. for i := 0; i < 16; i++ { size := config.MemoryBytesQuotaPerQuery / 16 @@ -1135,7 +1135,7 @@ func TestController_NoisyNeighbor(t *testing.T) { wellBehavedNeighbor := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Allocate memory until we hit our initial memory limit so we should // never request more memory. for amount := int64(0); amount < config.InitialMemoryBytesQuotaPerQuery; amount += 16 { @@ -1152,7 +1152,7 @@ func TestController_NoisyNeighbor(t *testing.T) { noisyNeighbor := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Allocate memory continuously to use up what we can and be as noisy as possible. // Turn up the stereo and party on. for { @@ -1241,7 +1241,7 @@ func TestController_Error_NoRemainingMemory(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Allocate memory continuously to use up what we can until denied. for size := int64(0); ; size += 16 { if err := alloc.Account(16); err != nil { @@ -1287,7 +1287,7 @@ func TestController_MemoryRelease(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Allocate some amount of memory and never release it. if err := alloc.Account(int(config.MemoryBytesQuotaPerQuery) / 2); err != nil { q.SetErr(err) @@ -1336,7 +1336,7 @@ func TestController_IrregularMemoryQuota(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Allocate memory continuously to hit the memory limit. for size := 0; size < 768; size += 16 { if err := alloc.Account(16); err != nil { @@ -1395,7 +1395,7 @@ func TestController_ReserveMemoryWithoutExceedingMax(t *testing.T) { compiler := &mock.Compiler{ CompileFn: func(ctx context.Context) (flux.Program, error) { return &mock.Program{ - ExecuteFn: func(ctx context.Context, q *mock.Query, alloc *memory.Allocator) { + ExecuteFn: func(ctx context.Context, q *mock.Query, alloc memory.Allocator) { // Allocate memory continuously to use up what we can and be as noisy as possible. // Turn up the stereo and party on. for size := 0; size < 1024; size += 16 { diff --git a/query/control/memory.go b/query/control/memory.go index 609ad43739d..3889ee9d2ff 100644 --- a/query/control/memory.go +++ b/query/control/memory.go @@ -47,7 +47,7 @@ func (c *Controller) createAllocator(q *Query) { m: c.memory, limit: c.memory.initialBytesQuotaPerQuery, } - q.alloc = &memory.Allocator{ + q.alloc = &memory.ResourceAllocator{ // Use an anonymous function to ensure the value is copied. Limit: func(v int64) *int64 { return &v }(q.memoryManager.limit), Manager: q.memoryManager, diff --git a/storage/flux/reader.go b/storage/flux/reader.go index c044912045d..1526fc4b4d3 100644 --- a/storage/flux/reader.go +++ b/storage/flux/reader.go @@ -62,7 +62,7 @@ func NewReader(s storage.Store) query.Reader { return &storeReader{s: s} } -func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) { +func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc memory.Allocator) (query.TableIterator, error) { return &filterIterator{ ctx: ctx, s: r.s, @@ -72,7 +72,7 @@ func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, }, nil } -func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) { +func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc memory.Allocator) (query.TableIterator, error) { return &groupIterator{ ctx: ctx, s: r.s, @@ -82,7 +82,7 @@ func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, a }, nil } -func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) { +func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc memory.Allocator) (query.TableIterator, error) { return &windowAggregateIterator{ ctx: ctx, s: r.s, @@ -92,7 +92,7 @@ func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWi }, nil } -func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) { +func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc memory.Allocator) (query.TableIterator, error) { return &tagKeysIterator{ ctx: ctx, bounds: spec.Bounds, @@ -103,7 +103,7 @@ func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpe }, nil } -func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) { +func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc memory.Allocator) (query.TableIterator, error) { return &tagValuesIterator{ ctx: ctx, bounds: spec.Bounds, @@ -122,7 +122,7 @@ type filterIterator struct { spec query.ReadFilterSpec stats cursors.CursorStats cache *tagsCache - alloc *memory.Allocator + alloc memory.Allocator } func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats } @@ -239,7 +239,7 @@ type groupIterator struct { spec query.ReadGroupSpec stats cursors.CursorStats cache *tagsCache - alloc *memory.Allocator + alloc memory.Allocator } func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats } @@ -612,7 +612,7 @@ type windowAggregateIterator struct { spec query.ReadWindowAggregateSpec stats cursors.CursorStats cache *tagsCache - alloc *memory.Allocator + alloc memory.Allocator } func (wai *windowAggregateIterator) Statistics() cursors.CursorStats { return wai.stats } @@ -847,7 +847,7 @@ type tagKeysIterator struct { s storage.Store readSpec query.ReadTagKeysSpec predicate *datatypes.Predicate - alloc *memory.Allocator + alloc memory.Allocator } func (ti *tagKeysIterator) Do(f func(flux.Table) error) error { @@ -932,7 +932,7 @@ type tagValuesIterator struct { s storage.Store readSpec query.ReadTagValuesSpec predicate *datatypes.Predicate - alloc *memory.Allocator + alloc memory.Allocator } func (ti *tagValuesIterator) Do(f func(flux.Table) error) error { diff --git a/storage/flux/table.gen.go b/storage/flux/table.gen.go index 202d9c76fc3..c38586f6bdb 100644 --- a/storage/flux/table.gen.go +++ b/storage/flux/table.gen.go @@ -33,7 +33,7 @@ type floatTable struct { table mu sync.Mutex cur cursors.FloatArrayCursor - alloc *memory.Allocator + alloc memory.Allocator } func newFloatTable( @@ -45,7 +45,7 @@ func newFloatTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *floatTable { t := &floatTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -127,7 +127,7 @@ func newFloatWindowTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *floatWindowTable { t := &floatWindowTable{ floatTable: floatTable{ @@ -334,7 +334,7 @@ func newFloatWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *floatWindowSelectorTable { t := &floatWindowSelectorTable{ floatTable: floatTable{ @@ -435,7 +435,7 @@ func newFloatEmptyWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *floatEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) @@ -671,7 +671,7 @@ func newFloatGroupTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *floatGroupTable { t := &floatGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -1008,7 +1008,7 @@ type integerTable struct { table mu sync.Mutex cur cursors.IntegerArrayCursor - alloc *memory.Allocator + alloc memory.Allocator } func newIntegerTable( @@ -1020,7 +1020,7 @@ func newIntegerTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *integerTable { t := &integerTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -1103,7 +1103,7 @@ func newIntegerWindowTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *integerWindowTable { t := &integerWindowTable{ integerTable: integerTable{ @@ -1311,7 +1311,7 @@ func newIntegerWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *integerWindowSelectorTable { t := &integerWindowSelectorTable{ integerTable: integerTable{ @@ -1412,7 +1412,7 @@ func newIntegerEmptyWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *integerEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) @@ -1648,7 +1648,7 @@ func newIntegerGroupTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *integerGroupTable { t := &integerGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -1986,7 +1986,7 @@ type unsignedTable struct { table mu sync.Mutex cur cursors.UnsignedArrayCursor - alloc *memory.Allocator + alloc memory.Allocator } func newUnsignedTable( @@ -1998,7 +1998,7 @@ func newUnsignedTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *unsignedTable { t := &unsignedTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -2080,7 +2080,7 @@ func newUnsignedWindowTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *unsignedWindowTable { t := &unsignedWindowTable{ unsignedTable: unsignedTable{ @@ -2287,7 +2287,7 @@ func newUnsignedWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *unsignedWindowSelectorTable { t := &unsignedWindowSelectorTable{ unsignedTable: unsignedTable{ @@ -2388,7 +2388,7 @@ func newUnsignedEmptyWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *unsignedEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) @@ -2624,7 +2624,7 @@ func newUnsignedGroupTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *unsignedGroupTable { t := &unsignedGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -2961,7 +2961,7 @@ type stringTable struct { table mu sync.Mutex cur cursors.StringArrayCursor - alloc *memory.Allocator + alloc memory.Allocator } func newStringTable( @@ -2973,7 +2973,7 @@ func newStringTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *stringTable { t := &stringTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -3055,7 +3055,7 @@ func newStringWindowTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *stringWindowTable { t := &stringWindowTable{ stringTable: stringTable{ @@ -3262,7 +3262,7 @@ func newStringWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *stringWindowSelectorTable { t := &stringWindowSelectorTable{ stringTable: stringTable{ @@ -3363,7 +3363,7 @@ func newStringEmptyWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *stringEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) @@ -3599,7 +3599,7 @@ func newStringGroupTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *stringGroupTable { t := &stringGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -3880,7 +3880,7 @@ type booleanTable struct { table mu sync.Mutex cur cursors.BooleanArrayCursor - alloc *memory.Allocator + alloc memory.Allocator } func newBooleanTable( @@ -3892,7 +3892,7 @@ func newBooleanTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *booleanTable { t := &booleanTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -3974,7 +3974,7 @@ func newBooleanWindowTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *booleanWindowTable { t := &booleanWindowTable{ booleanTable: booleanTable{ @@ -4181,7 +4181,7 @@ func newBooleanWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *booleanWindowSelectorTable { t := &booleanWindowSelectorTable{ booleanTable: booleanTable{ @@ -4282,7 +4282,7 @@ func newBooleanEmptyWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *booleanEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) @@ -4518,7 +4518,7 @@ func newBooleanGroupTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *booleanGroupTable { t := &booleanGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), diff --git a/storage/flux/table.gen.go.tmpl b/storage/flux/table.gen.go.tmpl index 79e0c5aa9a9..59dfaea004e 100644 --- a/storage/flux/table.gen.go.tmpl +++ b/storage/flux/table.gen.go.tmpl @@ -27,7 +27,7 @@ type {{.name}}Table struct { table mu sync.Mutex cur cursors.{{.Name}}ArrayCursor - alloc *memory.Allocator + alloc memory.Allocator } func new{{.Name}}Table( @@ -39,7 +39,7 @@ func new{{.Name}}Table( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *{{.name}}Table { t := &{{.name}}Table{ table: newTable(done, bounds, key, cols, defs, cache, alloc), @@ -122,7 +122,7 @@ func new{{.Name}}WindowTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *{{.name}}WindowTable { t := &{{.name}}WindowTable{ {{.name}}Table: {{.name}}Table{ @@ -330,7 +330,7 @@ func new{{.Name}}WindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *{{.name}}WindowSelectorTable { t := &{{.name}}WindowSelectorTable{ {{.name}}Table: {{.name}}Table{ @@ -431,7 +431,7 @@ func new{{.Name}}EmptyWindowSelectorTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *{{.name}}EmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) @@ -667,7 +667,7 @@ func new{{.Name}}GroupTable( tags models.Tags, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) *{{.name}}GroupTable { t := &{{.name}}GroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), diff --git a/storage/flux/table.go b/storage/flux/table.go index d5be7af7645..0be6144d728 100644 --- a/storage/flux/table.go +++ b/storage/flux/table.go @@ -33,7 +33,7 @@ type table struct { cancelled, used int32 cache *tagsCache - alloc *memory.Allocator + alloc memory.Allocator } func newTable( @@ -43,7 +43,7 @@ func newTable( cols []flux.ColMeta, defs [][]byte, cache *tagsCache, - alloc *memory.Allocator, + alloc memory.Allocator, ) table { return table{ done: done, diff --git a/storage/flux/table_test.go b/storage/flux/table_test.go index f1a76ecf1d9..b135a1e105d 100644 --- a/storage/flux/table_test.go +++ b/storage/flux/table_test.go @@ -161,7 +161,7 @@ func NewStorageReader(tb testing.TB, setupFn SetupFunc) *StorageReader { } } -func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { +func (r *StorageReader) ReadWindowAggregate(ctx context.Context, spec influxdb.ReadWindowAggregateSpec, alloc memory.Allocator) (influxdb.TableIterator, error) { return r.Reader.ReadWindowAggregate(ctx, spec, alloc) } @@ -181,7 +181,7 @@ func TestStorageReader_ReadFilter(t *testing.T) { mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator) defer mem.AssertSize(t, 0) - alloc := &memory.Allocator{ + alloc := &memory.ResourceAllocator{ Allocator: mem, } ti, err := reader.ReadFilter(context.Background(), influxdb.ReadFilterSpec{ @@ -257,11 +257,11 @@ func TestStorageReader_Table(t *testing.T) { for _, tc := range []struct { name string - newFn func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator + newFn func(ctx context.Context, alloc memory.Allocator) flux.TableIterator }{ { name: "ReadFilter", - newFn: func(ctx context.Context, alloc *memory.Allocator) flux.TableIterator { + newFn: func(ctx context.Context, alloc memory.Allocator) flux.TableIterator { ti, err := reader.ReadFilter(context.Background(), influxdb.ReadFilterSpec{ Database: reader.Database, RetentionPolicy: reader.RetentionPolicy, @@ -413,7 +413,7 @@ func TestStorageReader_ReadWindowAggregate(t *testing.T) { mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator) defer mem.AssertSize(t, 0) - alloc := &memory.Allocator{ + alloc := &memory.ResourceAllocator{ Allocator: mem, } got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ @@ -513,7 +513,6 @@ func TestStorageReader_ReadWindowAggregate_ByStopTime(t *testing.T) { }, }, } { - mem := &memory.Allocator{} got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -528,7 +527,7 @@ func TestStorageReader_ReadWindowAggregate_ByStopTime(t *testing.T) { Aggregates: []plan.ProcedureKind{ tt.aggregate, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -612,7 +611,6 @@ func TestStorageReader_ReadWindowAggregate_ByStartTime(t *testing.T) { }, } { t.Run(string(tt.aggregate), func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -627,7 +625,7 @@ func TestStorageReader_ReadWindowAggregate_ByStartTime(t *testing.T) { Aggregates: []plan.ProcedureKind{ tt.aggregate, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -796,7 +794,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmpty(t *testing.T) { }, } { t.Run(string(tt.aggregate), func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -811,7 +808,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmpty(t *testing.T) { tt.aggregate, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -896,7 +893,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStopTime(t *testing.T) { }, } { t.Run(string(tt.aggregate), func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -912,7 +908,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStopTime(t *testing.T) { tt.aggregate, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -997,7 +993,6 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T) }, } { t.Run(string(tt.aggregate), func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1013,7 +1008,7 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T) tt.aggregate, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1131,7 +1126,6 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) { }, } { t.Run(string(tt.aggregate), func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1148,7 +1142,7 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) { Aggregates: []plan.ProcedureKind{ tt.aggregate, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1266,7 +1260,6 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing }, } { t.Run(string(tt.aggregate), func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1284,7 +1277,7 @@ func TestStorageReader_ReadWindowAggregate_TruncatedBoundsCreateEmpty(t *testing tt.aggregate, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1340,7 +1333,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) { defer reader.Close() t.Run("unwindowed mean", func(t *testing.T) { - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1354,7 +1346,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) { Aggregates: []plan.ProcedureKind{ storageflux.MeanKind, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1373,7 +1365,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) { }) t.Run("windowed mean", func(t *testing.T) { - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1387,7 +1378,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) { Aggregates: []plan.ProcedureKind{ storageflux.MeanKind, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1433,7 +1424,6 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) { }) t.Run("windowed mean with offset", func(t *testing.T) { - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1448,7 +1438,7 @@ func TestStorageReader_ReadWindowAggregate_Mean(t *testing.T) { Aggregates: []plan.ProcedureKind{ storageflux.MeanKind, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1542,7 +1532,6 @@ func TestStorageReader_ReadWindowFirst(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1556,7 +1545,7 @@ func TestStorageReader_ReadWindowFirst(t *testing.T) { Aggregates: []plan.ProcedureKind{ storageflux.FirstKind, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1653,7 +1642,6 @@ func TestStorageReader_WindowFirstOffset(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1668,7 +1656,7 @@ func TestStorageReader_WindowFirstOffset(t *testing.T) { Aggregates: []plan.ProcedureKind{ storageflux.FirstKind, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1766,7 +1754,6 @@ func TestStorageReader_WindowSumOffset(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1781,7 +1768,7 @@ func TestStorageReader_WindowSumOffset(t *testing.T) { Aggregates: []plan.ProcedureKind{ storageflux.SumKind, }, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -1878,7 +1865,6 @@ func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -1893,7 +1879,7 @@ func TestStorageReader_ReadWindowFirstCreateEmpty(t *testing.T) { storageflux.FirstKind, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2018,7 +2004,6 @@ func TestStorageReader_WindowFirstOffsetCreateEmpty(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2034,7 +2019,7 @@ func TestStorageReader_WindowFirstOffsetCreateEmpty(t *testing.T) { storageflux.FirstKind, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2162,7 +2147,6 @@ func TestStorageReader_WindowSumOffsetCreateEmpty(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2178,7 +2162,7 @@ func TestStorageReader_WindowSumOffsetCreateEmpty(t *testing.T) { storageflux.SumKind, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2306,7 +2290,6 @@ func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2322,7 +2305,7 @@ func TestStorageReader_ReadWindowFirstTimeColumn(t *testing.T) { }, CreateEmpty: true, TimeColumn: execute.DefaultStopColLabel, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2411,7 +2394,6 @@ func TestStorageReader_WindowFirstOffsetTimeColumn(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2428,7 +2410,7 @@ func TestStorageReader_WindowFirstOffsetTimeColumn(t *testing.T) { }, CreateEmpty: true, TimeColumn: execute.DefaultStopColLabel, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2517,7 +2499,6 @@ func TestStorageReader_WindowSumOffsetTimeColumn(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2534,7 +2515,7 @@ func TestStorageReader_WindowSumOffsetTimeColumn(t *testing.T) { }, CreateEmpty: true, TimeColumn: execute.DefaultStopColLabel, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2627,7 +2608,6 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) { }) defer reader.Close() - mem := &memory.Allocator{} ti, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2642,7 +2622,7 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) { storageflux.FirstKind, }, CreateEmpty: true, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2824,7 +2804,7 @@ func TestStorageReader_ReadGroup(t *testing.T) { mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator) defer mem.AssertSize(t, 0) - alloc := &memory.Allocator{ + alloc := &memory.ResourceAllocator{ Allocator: mem, } got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ @@ -2917,7 +2897,6 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) { for _, tt := range cases { t.Run(tt.aggregate, func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2927,7 +2906,7 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) { GroupMode: influxdb.GroupModeBy, GroupKeys: []string{"t0"}, AggregateMethod: tt.aggregate, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -2989,7 +2968,6 @@ func TestStorageReader_ReadGroupNoAgg(t *testing.T) { for _, tt := range cases { t.Run(tt.aggregate, func(t *testing.T) { - mem := &memory.Allocator{} got, err := reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -2998,7 +2976,7 @@ func TestStorageReader_ReadGroupNoAgg(t *testing.T) { }, GroupMode: influxdb.GroupModeBy, GroupKeys: []string{"t1"}, - }, mem) + }, memory.DefaultAllocator) if err != nil { t.Fatal(err) } @@ -3123,7 +3101,7 @@ func TestStorageReader_ReadWindowAggregateMonths(t *testing.T) { mem := arrowmem.NewCheckedAllocator(arrowmem.DefaultAllocator) defer mem.AssertSize(t, 0) - alloc := &memory.Allocator{ + alloc := &memory.ResourceAllocator{ Allocator: mem, } got, err := reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ @@ -3172,11 +3150,11 @@ func TestStorageReader_Backoff(t *testing.T) { for _, tt := range []struct { name string - read func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) + read func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) }{ { name: "ReadFilter", - read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) { + read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) { return reader.ReadFilter(context.Background(), influxdb.ReadFilterSpec{ Database: reader.Database, RetentionPolicy: reader.RetentionPolicy, @@ -3186,7 +3164,7 @@ func TestStorageReader_Backoff(t *testing.T) { }, { name: "ReadGroup", - read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) { + read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) { return reader.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -3200,7 +3178,7 @@ func TestStorageReader_Backoff(t *testing.T) { }, { name: "ReadWindowAggregate", - read: func(reader *StorageReader, mem *memory.Allocator) (flux.TableIterator, error) { + read: func(reader *StorageReader, mem memory.Allocator) (flux.TableIterator, error) { return reader.ReadWindowAggregate(context.Background(), influxdb.ReadWindowAggregateSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: reader.Database, @@ -3222,7 +3200,7 @@ func TestStorageReader_Backoff(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Read the table and learn what the maximum allocated // value is. We don't want to exceed this. - mem := &memory.Allocator{} + mem := &memory.ResourceAllocator{} tables, err := tt.read(reader, mem) if err != nil { t.Fatal(err) @@ -3249,7 +3227,7 @@ func TestStorageReader_Backoff(t *testing.T) { // if the next buffer attempts to be allocated // before the first. limit := mem.MaxAllocated() - mem = &memory.Allocator{Limit: &limit} + mem = &memory.ResourceAllocator{Limit: &limit} tables, err = tt.read(reader, mem) if err != nil { t.Fatal(err) @@ -3360,12 +3338,11 @@ func BenchmarkReadFilter(b *testing.B) { return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr } benchmarkRead(b, setupFn, func(r *StorageReader) error { - mem := &memory.Allocator{} tables, err := r.ReadFilter(context.Background(), influxdb.ReadFilterSpec{ Database: r.Database, RetentionPolicy: r.RetentionPolicy, Bounds: r.Bounds, - }, mem) + }, memory.DefaultAllocator) if err != nil { return err } @@ -3465,7 +3442,6 @@ func BenchmarkReadGroup(b *testing.B) { return datagen.NewSeriesGeneratorFromSpec(&spec, tr), tr } benchmarkRead(b, setupFn, func(r *StorageReader) error { - mem := &memory.Allocator{} tables, err := r.ReadGroup(context.Background(), influxdb.ReadGroupSpec{ ReadFilterSpec: influxdb.ReadFilterSpec{ Database: r.Database, @@ -3475,7 +3451,7 @@ func BenchmarkReadGroup(b *testing.B) { GroupMode: influxdb.GroupModeBy, GroupKeys: []string{"_start", "_stop", "t0"}, AggregateMethod: storageflux.MinKind, - }, mem) + }, memory.DefaultAllocator) if err != nil { return err } diff --git a/test-flux.sh b/test-flux.sh index f63a839d7da..df13e454807 100755 --- a/test-flux.sh +++ b/test-flux.sh @@ -32,7 +32,7 @@ build_test_harness() { } # Many tests targeting 3rd party databases are not yet supported in CI and should be filtered out. -DB_INTEGRATION_WRITE_TESTS=integration_sqlite_write_to,integration_vertica_write_to,integration_mssql_write_to,integration_mysql_write_to,integration_mariadb_write_to,integration_pg_write_to,integration_hdb_write_to +DB_INTEGRATION_WRITE_TESTS=integration_mqtt_pub,integration_sqlite_write_to,integration_vertica_write_to,integration_mssql_write_to,integration_mysql_write_to,integration_mariadb_write_to,integration_pg_write_to,integration_hdb_write_to DB_INTEGRATION_READ_TESTS=integration_sqlite_read_from_seed,integration_sqlite_read_from_nonseed,integration_vertica_read_from_seed,integration_vertica_read_from_nonseed,integration_mssql_read_from_seed,integration_mssql_read_from_nonseed,integration_mariadb_read_from_seed,integration_mariadb_read_from_nonseed,integration_mysql_read_from_seed,integration_mysql_read_from_nonseed,integration_pg_read_from_seed,integration_pg_read_from_nonseed,integration_hdb_read_from_seed,integration_hdb_read_from_nonseed DB_INTEGRATION_INJECTION_TESTS="integration_sqlite_injection,integration_hdb_injection,integration_pg_injection,integration_mysql_injection,integration_mariadb_injection,integration_mssql_injection" DB_TESTS="${DB_INTEGRATION_WRITE_TESTS},${DB_INTEGRATION_READ_TESTS},${DB_INTEGRATION_INJECTION_TESTS}"