Skip to content

Commit

Permalink
Fix the broken interaction between extended range and reuse of overla…
Browse files Browse the repository at this point in the history
…pping buffer contents.
  • Loading branch information
free committed Nov 9, 2018
1 parent f57774d commit fb6f7c8
Showing 1 changed file with 16 additions and 4 deletions.
20 changes: 16 additions & 4 deletions promql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
// For all matrix queries we want to ensure that we have (end-start) + range selected
// this way we have `range` data before the start time
params.Start = params.Start - durationMilliseconds(n.Range)
// Include an extra LookbackDelta iff this is the argument to an
// extended range function. Extended ranges include one extra
// point, this is how far back we need to look for it.
f, ok := getFunction(params.Func)
if ok && f.ExtRange {
params.Start = params.Start - durationMilliseconds(LookbackDelta)
}
if n.Offset > 0 {
offsetMilliseconds := durationMilliseconds(n.Offset)
params.Start = params.Start - offsetMilliseconds
Expand Down Expand Up @@ -895,16 +902,17 @@ func (ev *evaluator) eval(expr Expr) Value {
mat := make(Matrix, 0, len(sel.series)) // Output matrix.
offset := durationMilliseconds(sel.Offset)
selRange := durationMilliseconds(sel.Range)
stepRange := selRange
if stepRange > ev.interval {
stepRange = ev.interval
}
bufferRange := selRange
stepRange := selRange
// Include an extra LookbackDelta iff this is an extended
// range function. Extended ranges include one extra point,
// this is how far back we need to look for it.
if e.Func.ExtRange {
bufferRange += durationMilliseconds(LookbackDelta)
stepRange += durationMilliseconds(LookbackDelta)
}
if stepRange > ev.interval {
stepRange = ev.interval
}
// Reuse objects across steps to save memory allocations.
points := getPointSlice(16)
Expand Down Expand Up @@ -1198,6 +1206,10 @@ func (ev *evaluator) matrixIterSlice(it *storage.BufferedSeriesIterator, mint, m
if drop > 0 && out[drop-1].T >= extMint {
drop--
}
if out[len(out)-1].T >= mint {
// Only append points with timestamps after the last timestamp we have.
mint = out[len(out)-1].T + 1
}
}
copy(out, out[drop:])
out = out[:len(out)-drop]
Expand Down

0 comments on commit fb6f7c8

Please sign in to comment.