-
Notifications
You must be signed in to change notification settings - Fork 458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability for query storage to provide unconsolidated blocks #929
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,6 +47,11 @@ func NewScalar(val float64, bounds models.Bounds) Block { | |
} | ||
} | ||
|
||
// Unconsolidated returns the unconsolidated version for the block | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ben had the same comment. We use "for the block" almost everywhere There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's grammatically correct almost everywhere, just not here 😛 |
||
func (b *Scalar) Unconsolidated() (UnconsolidatedBlock, error) { | ||
return nil, fmt.Errorf("unconsolidated view not implemented for scalar block, meta: %s", b.meta) | ||
} | ||
|
||
// StepIter returns a StepIterator | ||
func (b *Scalar) StepIter() (StepIter, error) { | ||
bounds := b.meta.Bounds | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,10 @@ | |
|
||
package block | ||
|
||
import ( | ||
"github.com/m3db/m3/src/query/ts" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally we want to avoid any dbnode things especially coupling their series representation with ours. We'd probably move anything m3db related within ts/m3db and localize it there |
||
) | ||
|
||
// Series is a single series within a block | ||
type Series struct { | ||
values []float64 | ||
|
@@ -45,3 +49,43 @@ func (s Series) Values() []float64 { | |
func (s Series) Len() int { | ||
return len(s.values) | ||
} | ||
|
||
// UnconsolidatedSeries is the series with raw datapoints | ||
type UnconsolidatedSeries struct { | ||
datapoints []ts.Datapoints | ||
Meta SeriesMeta | ||
} | ||
|
||
// NewUnconsolidatedSeries creates a new series with raw datapoints | ||
func NewUnconsolidatedSeries(datapoints []ts.Datapoints, meta SeriesMeta) UnconsolidatedSeries { | ||
return UnconsolidatedSeries{datapoints: datapoints, Meta: meta} | ||
} | ||
|
||
// DatapointsAtStep returns the raw datapoints at a step index | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's returning multiple datapoints ? |
||
func (s UnconsolidatedSeries) DatapointsAtStep(idx int) ts.Datapoints { | ||
if idx < 0 || idx >= len(s.datapoints) { | ||
return nil | ||
} | ||
|
||
return s.datapoints[idx] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should perform bound checks |
||
} | ||
|
||
// Datapoints returns the internal datapoints slice | ||
func (s UnconsolidatedSeries) Datapoints() []ts.Datapoints { | ||
return s.datapoints | ||
} | ||
|
||
// Len returns the number of datapoints slices in the series | ||
func (s UnconsolidatedSeries) Len() int { | ||
return len(s.datapoints) | ||
} | ||
|
||
// Consolidated consolidates the series | ||
func (s UnconsolidatedSeries) Consolidated(consolidationFunc ConsolidationFunc) Series { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: rename to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's just talking about a view. Maybe we already have it cached and don't really consolidate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like |
||
values := make([]float64, len(s.datapoints)) | ||
for i, vals := range s.datapoints { | ||
values[i] = consolidationFunc(vals) | ||
} | ||
|
||
return NewSeries(values, s.Meta) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,13 +22,17 @@ package block | |
|
||
import ( | ||
"fmt" | ||
"math" | ||
"time" | ||
|
||
"github.com/m3db/m3/src/query/models" | ||
"github.com/m3db/m3/src/query/ts" | ||
) | ||
|
||
// Block represents a group of series across a time bound | ||
type Block interface { | ||
// Unconsolidated returns the unconsolidated version of the block | ||
Unconsolidated() (UnconsolidatedBlock, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe rather than error it could return a bool to indicate if it can be unconsolidated? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even for things which can support unconsolidated, you can still get an error. So we need an error here neverthless. I like this over having a bool so that clients don't check 2 things There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit weird; you may want to continue and do something different if you can't consolidate because you errored vs your block being unable to block, but that could be changed later if necessary |
||
// StepIter returns a StepIterator | ||
StepIter() (StepIter, error) | ||
// SeriesIter returns a SeriesIterator | ||
|
@@ -37,6 +41,18 @@ type Block interface { | |
Close() error | ||
} | ||
|
||
// UnconsolidatedBlock represents a group of unconsolidated series across a time bound | ||
type UnconsolidatedBlock interface { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there ever a scenario where you'd want to consolidate a single series rather than the entire block? If not, might be good to add a capability to do this at the block level There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the UnconsolidatedBlock has a Consolidate() which does it at the block level. Not super sure what you mean. |
||
// StepIter returns a StepIterator | ||
StepIter() (UnconsolidatedStepIter, error) | ||
// SeriesIter returns a SeriesIterator | ||
SeriesIter() (UnconsolidatedSeriesIter, error) | ||
// Consolidate an unconsolidated block | ||
Consolidate() (Block, error) | ||
// Close frees up any resources | ||
Close() error | ||
} | ||
|
||
// SeriesMeta is metadata data for the series | ||
type SeriesMeta struct { | ||
Tags models.Tags | ||
|
@@ -49,30 +65,58 @@ type Iterator interface { | |
Close() | ||
} | ||
|
||
// MetaIter is implemented by iterators which provide meta information | ||
type MetaIter interface { | ||
// SeriesMeta returns the metadata for each series in the block | ||
SeriesMeta() []SeriesMeta | ||
// Meta returns the metadata for the block | ||
Meta() Metadata | ||
} | ||
|
||
// SeriesMetaIter is implemented by series iterators which provide meta information | ||
type SeriesMetaIter interface { | ||
MetaIter | ||
// SeriesCount returns the number of series | ||
SeriesCount() int | ||
} | ||
|
||
// SeriesIter iterates through a block horizontally | ||
type SeriesIter interface { | ||
Iterator | ||
SeriesMetaIter | ||
// Current returns the current series for the block | ||
Current() (Series, error) | ||
// SeriesCount returns the number of series | ||
SeriesCount() int | ||
// SeriesMeta returns the metadata for each series in the block | ||
SeriesMeta() []SeriesMeta | ||
// Meta returns the metadata for the block | ||
Meta() Metadata | ||
} | ||
|
||
// UnconsolidatedSeriesIter iterates through a block horizontally | ||
type UnconsolidatedSeriesIter interface { | ||
Iterator | ||
SeriesMetaIter | ||
// Current returns the current series for the block | ||
Current() (UnconsolidatedSeries, error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Current() typically panics rather than returning an error There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all other Current() also return error |
||
} | ||
|
||
// StepMetaIter is implemented by step iterators which provide meta information | ||
type StepMetaIter interface { | ||
MetaIter | ||
// StepCount returns the number of steps | ||
StepCount() int | ||
} | ||
|
||
// StepIter iterates through a block vertically | ||
type StepIter interface { | ||
Iterator | ||
StepMetaIter | ||
// Current returns the current step for the block | ||
Current() (Step, error) | ||
// StepCount returns the number of steps | ||
StepCount() int | ||
// SeriesMeta returns the metadata for each series in the block | ||
SeriesMeta() []SeriesMeta | ||
// Meta returns the metadata for the block | ||
Meta() Metadata | ||
} | ||
|
||
// UnconsolidatedStepIter iterates through a block vertically | ||
type UnconsolidatedStepIter interface { | ||
Iterator | ||
StepMetaIter | ||
// Current returns the current step for the block | ||
Current() (UnconsolidatedStep, error) | ||
} | ||
|
||
// Step is a single time step within a block | ||
|
@@ -81,6 +125,12 @@ type Step interface { | |
Values() []float64 | ||
} | ||
|
||
// UnconsolidatedStep is a single unconsolidated time step within a block | ||
type UnconsolidatedStep interface { | ||
Time() time.Time | ||
Values() []ts.Datapoints | ||
} | ||
|
||
// Metadata is metadata for a block | ||
type Metadata struct { | ||
Bounds models.Bounds | ||
|
@@ -104,3 +154,17 @@ type Builder interface { | |
type Result struct { | ||
Blocks []Block | ||
} | ||
|
||
// ConsolidationFunc consolidates a bunch of datapoints into a single float value | ||
type ConsolidationFunc func(datapoints ts.Datapoints) float64 | ||
|
||
// TakeLast is a consolidation function which takes the last datapoint which has non nan value | ||
func TakeLast(values ts.Datapoints) float64 { | ||
for i := len(values) - 1; i >= 0; i-- { | ||
if !math.IsNaN(values[i].Value) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
return values[i].Value | ||
} | ||
} | ||
|
||
return math.NaN() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,18 +137,42 @@ func (s *seriesIter) Next() bool { | |
} | ||
|
||
type lazyBlock struct { | ||
mu sync.Mutex | ||
|
||
mu sync.Mutex | ||
rawBlock block.Block | ||
lazyNode *lazyNode | ||
ID parser.NodeID | ||
processedBlock block.Block | ||
processError error | ||
} | ||
|
||
// Unconsolidated returns the unconsolidated version for the block | ||
func (f *lazyBlock) Unconsolidated() (block.UnconsolidatedBlock, error) { | ||
f.mu.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good opportunity to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't use sync.Once since i need to look every read of f.processedBlock under the same mutex |
||
defer f.mu.Unlock() | ||
|
||
if f.processError != nil { | ||
return nil, f.processError | ||
} | ||
|
||
if f.processedBlock != nil { | ||
return f.processedBlock.Unconsolidated() | ||
} | ||
|
||
if err := f.process(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return f.processedBlock.Unconsolidated() | ||
} | ||
|
||
func (f *lazyBlock) StepIter() (block.StepIter, error) { | ||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
|
||
if f.processError != nil { | ||
return nil, f.processError | ||
} | ||
|
||
if f.processedBlock != nil { | ||
return f.processedBlock.StepIter() | ||
} | ||
|
@@ -178,6 +202,10 @@ func (f *lazyBlock) SeriesIter() (block.SeriesIter, error) { | |
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
|
||
if f.processError != nil { | ||
return nil, f.processError | ||
} | ||
|
||
if f.processedBlock != nil { | ||
return f.processedBlock.SeriesIter() | ||
} | ||
|
@@ -210,6 +238,7 @@ func (f *lazyBlock) Close() error { | |
func (f *lazyBlock) process() error { | ||
err := f.lazyNode.fNode.Process(f.ID, f.rawBlock) | ||
if err != nil { | ||
f.processError = err | ||
return err | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
of the block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use for the block at most places. Can do a search and replace in a separate diff