Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability for query storage to provide unconsolidated blocks #929

Merged
merged 4 commits into from
Oct 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/query/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions src/query/block/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type columnBlock struct {
seriesMeta []SeriesMeta
}

// Unconsolidated returns the unconsolidated version for the block
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: of the block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use for the block at most places. Can do a search and replace in a separate diff

func (c *columnBlock) Unconsolidated() (UnconsolidatedBlock, error) {
return nil, fmt.Errorf("unconsolidated view not supported for block, meta: %s", c.meta)
}

// Meta returns the metadata for the block
func (c *columnBlock) Meta() Metadata {
return c.meta
Expand Down
5 changes: 5 additions & 0 deletions src/query/block/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ func NewScalar(val float64, bounds models.Bounds) Block {
}
}

// Unconsolidated returns the unconsolidated version for the block
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit ...of the block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ben had the same comment. We use "for the block" almost everywhere

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's grammatically correct almost everywhere, just not here 😛

func (b *Scalar) Unconsolidated() (UnconsolidatedBlock, error) {
return nil, fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", b.meta)
}

// StepIter returns a StepIterator
func (b *Scalar) StepIter() (StepIter, error) {
bounds := b.meta.Bounds
Expand Down
44 changes: 44 additions & 0 deletions src/query/block/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@

package block

import (
"github.com/m3db/m3/src/query/ts"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use "github.com/m3db/m3/src/dbnode/ts" and delete this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we want to avoid any dbnode things especially coupling their series representation with ours. We'd probably move anything m3db related within ts/m3db and localize it there

)

// Series is a single series within a block
type Series struct {
values []float64
Expand All @@ -45,3 +49,43 @@ func (s Series) Values() []float64 {
func (s Series) Len() int {
return len(s.values)
}

// UnconsolidatedSeries is the series with raw datapoints
type UnconsolidatedSeries struct {
datapoints []ts.Datapoints
Meta SeriesMeta
}

// NewUnconsolidatedSeries creates a new series with raw datapoints
func NewUnconsolidatedSeries(datapoints []ts.Datapoints, meta SeriesMeta) UnconsolidatedSeries {
return UnconsolidatedSeries{datapoints: datapoints, Meta: meta}
}

// DatapointsAtStep returns the raw datapoints at a step index
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: DatapointAtStep returns the raw datapoint at a step index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's returning multiple datapoints ?

func (s UnconsolidatedSeries) DatapointsAtStep(idx int) ts.Datapoints {
if idx < 0 || idx >= len(s.datapoints) {
return nil
}

return s.datapoints[idx]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should perform bound checks

}

// Datapoints returns the internal datapoints slice
func (s UnconsolidatedSeries) Datapoints() []ts.Datapoints {
return s.datapoints
}

// Len returns the number of datapoints slices in the series
func (s UnconsolidatedSeries) Len() int {
return len(s.datapoints)
}

// Consolidated consolidates the series
func (s UnconsolidatedSeries) Consolidated(consolidationFunc ConsolidationFunc) Series {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to Consolidate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's just talking about a view. Maybe we already have it cached and don't really consolidate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like Consolidate

values := make([]float64, len(s.datapoints))
for i, vals := range s.datapoints {
values[i] = consolidationFunc(vals)
}

return NewSeries(values, s.Meta)
}
88 changes: 76 additions & 12 deletions src/query/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ package block

import (
"fmt"
"math"
"time"

"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/ts"
)

// Block represents a group of series across a time bound
type Block interface {
// Unconsolidated returns the unconsolidated version of the block
Unconsolidated() (UnconsolidatedBlock, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rather than error it could return a bool to indicate if it can be unconsolidated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even for things which can support unconsolidated, you can still get an error. So we need an error here neverthless. I like this over having a bool so that clients don't check 2 things

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit weird; you may want to continue and do something different if you can't consolidate because you errored vs your block being unable to block, but that could be changed later if necessary

// StepIter returns a StepIterator
StepIter() (StepIter, error)
// SeriesIter returns a SeriesIterator
Expand All @@ -37,6 +41,18 @@ type Block interface {
Close() error
}

// UnconsolidatedBlock represents a group of unconsolidated series across a time bound
type UnconsolidatedBlock interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a scenario where you'd want to consolidate a single series rather than the entire block? If not, might be good to add a capability to do this at the block level

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the UnconsolidatedBlock has a Consolidate() which does it at the block level. Not super sure what you mean.

// StepIter returns a StepIterator
StepIter() (UnconsolidatedStepIter, error)
// SeriesIter returns a SeriesIterator
SeriesIter() (UnconsolidatedSeriesIter, error)
// Consolidate an unconsolidated block
Consolidate() (Block, error)
// Close frees up any resources
Close() error
}

// SeriesMeta is metadata data for the series
type SeriesMeta struct {
Tags models.Tags
Expand All @@ -49,30 +65,58 @@ type Iterator interface {
Close()
}

// MetaIter is implemented by iterators which provide meta information
type MetaIter interface {
// SeriesMeta returns the metadata for each series in the block
SeriesMeta() []SeriesMeta
// Meta returns the metadata for the block
Meta() Metadata
}

// SeriesMetaIter is implemented by series iterators which provide meta information
type SeriesMetaIter interface {
MetaIter
// SeriesCount returns the number of series
SeriesCount() int
}

// SeriesIter iterates through a block horizontally
type SeriesIter interface {
Iterator
SeriesMetaIter
// Current returns the current series for the block
Current() (Series, error)
// SeriesCount returns the number of series
SeriesCount() int
// SeriesMeta returns the metadata for each series in the block
SeriesMeta() []SeriesMeta
// Meta returns the metadata for the block
Meta() Metadata
}

// UnconsolidatedSeriesIter iterates through a block horizontally
type UnconsolidatedSeriesIter interface {
Iterator
SeriesMetaIter
// Current returns the current series for the block
Current() (UnconsolidatedSeries, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current() typically panics rather than returning an error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all other Current() also return error

}

// StepMetaIter is implemented by step iterators which provide meta information
type StepMetaIter interface {
MetaIter
// StepCount returns the number of steps
StepCount() int
}

// StepIter iterates through a block vertically
type StepIter interface {
Iterator
StepMetaIter
// Current returns the current step for the block
Current() (Step, error)
// StepCount returns the number of steps
StepCount() int
// SeriesMeta returns the metadata for each series in the block
SeriesMeta() []SeriesMeta
// Meta returns the metadata for the block
Meta() Metadata
}

// UnconsolidatedStepIter iterates through a block vertically
type UnconsolidatedStepIter interface {
Iterator
StepMetaIter
// Current returns the current step for the block
Current() (UnconsolidatedStep, error)
}

// Step is a single time step within a block
Expand All @@ -81,6 +125,12 @@ type Step interface {
Values() []float64
}

// UnconsolidatedStep is a single unconsolidated time step within a block
type UnconsolidatedStep interface {
Time() time.Time
Values() []ts.Datapoints
}

// Metadata is metadata for a block
type Metadata struct {
Bounds models.Bounds
Expand All @@ -104,3 +154,17 @@ type Builder interface {
type Result struct {
Blocks []Block
}

// ConsolidationFunc consolidates a bunch of datapoints into a single float value
type ConsolidationFunc func(datapoints ts.Datapoints) float64

// TakeLast is a consolidation function which takes the last datapoint which has non nan value
func TakeLast(values ts.Datapoints) float64 {
for i := len(values) - 1; i >= 0; i-- {
if !math.IsNaN(values[i].Value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

return values[i].Value
}
}

return math.NaN()
}
33 changes: 31 additions & 2 deletions src/query/executor/transform/lazy.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,42 @@ func (s *seriesIter) Next() bool {
}

type lazyBlock struct {
mu sync.Mutex

mu sync.Mutex
rawBlock block.Block
lazyNode *lazyNode
ID parser.NodeID
processedBlock block.Block
processError error
}

// Unconsolidated returns the unconsolidated version for the block
func (f *lazyBlock) Unconsolidated() (block.UnconsolidatedBlock, error) {
f.mu.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good opportunity to use sync.Once :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't use sync.Once since i need to look every read of f.processedBlock under the same mutex

defer f.mu.Unlock()

if f.processError != nil {
return nil, f.processError
}

if f.processedBlock != nil {
return f.processedBlock.Unconsolidated()
}

if err := f.process(); err != nil {
return nil, err
}

return f.processedBlock.Unconsolidated()
}

func (f *lazyBlock) StepIter() (block.StepIter, error) {
f.mu.Lock()
defer f.mu.Unlock()

if f.processError != nil {
return nil, f.processError
}

if f.processedBlock != nil {
return f.processedBlock.StepIter()
}
Expand Down Expand Up @@ -178,6 +202,10 @@ func (f *lazyBlock) SeriesIter() (block.SeriesIter, error) {
f.mu.Lock()
defer f.mu.Unlock()

if f.processError != nil {
return nil, f.processError
}

if f.processedBlock != nil {
return f.processedBlock.SeriesIter()
}
Expand Down Expand Up @@ -210,6 +238,7 @@ func (f *lazyBlock) Close() error {
func (f *lazyBlock) process() error {
err := f.lazyNode.fNode.Process(f.ID, f.rawBlock)
if err != nil {
f.processError = err
return err
}

Expand Down
5 changes: 3 additions & 2 deletions src/query/functions/temporal/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"

"github.com/m3db/m3/src/query/executor/transform"
"github.com/m3db/m3/src/query/ts"
)

const (
Expand Down Expand Up @@ -87,8 +88,8 @@ type aggNode struct {
aggFunc func([]float64) float64
}

func (a *aggNode) Process(values []float64) float64 {
return a.aggFunc(values)
func (a *aggNode) Process(datapoints ts.Datapoints) float64 {
return a.aggFunc(datapoints.Values())
}

func avgOverTime(values []float64) float64 {
Expand Down
6 changes: 3 additions & 3 deletions src/query/functions/temporal/aggregation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) {
t.Run(tt.name, func(t *testing.T) {
values, bounds := test.GenerateValuesAndBounds(vals, nil)
boundStart := bounds.Start
block3 := test.NewBlockFromValues(bounds, values)
block3 := test.NewUnconsolidatedBlockFromDatapoints(bounds, values)
c, sink := executor.NewControllerWithSink(parser.NodeID(1))

baseOp, err := NewAggOp([]interface{}{5 * time.Minute}, tt.opType)
Expand All @@ -262,7 +262,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) {

original := values[0][0]
values[0][0] = math.NaN()
block1 := test.NewBlockFromValues(models.Bounds{
block1 := test.NewUnconsolidatedBlockFromDatapoints(models.Bounds{
Start: bounds.Start.Add(-2 * bounds.Duration),
Duration: bounds.Duration,
StepSize: bounds.StepSize,
Expand All @@ -279,7 +279,7 @@ func testAggregation(t *testing.T, testCases []testCase, vals [][]float64) {
_, exists = bNode.cache.get(boundStart.Add(-1 * bounds.Duration))
assert.False(t, exists, "block cached")

block2 := test.NewBlockFromValues(models.Bounds{
block2 := test.NewUnconsolidatedBlockFromDatapoints(models.Bounds{
Start: bounds.Start.Add(-1 * bounds.Duration),
Duration: bounds.Duration,
StepSize: bounds.StepSize,
Expand Down
Loading