Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions mock/reads_resultset.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package mock

import (
"fmt"

"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/data/gen"
"github.com/influxdata/influxdb/v2/storage/reads"
Expand Down Expand Up @@ -61,7 +63,7 @@ func (g *GeneratorResultSet) Next() bool {
return g.sg.Next() && (g.max == 0 || remain > 0)
}

func (g *GeneratorResultSet) Cursor() cursors.Cursor {
func (g *GeneratorResultSet) Cursor() (cursors.Cursor, error) {
switch g.sg.FieldType() {
case models.Float:
g.f.tv = g.sg.TimeValuesGenerator()
Expand All @@ -79,10 +81,10 @@ func (g *GeneratorResultSet) Cursor() cursors.Cursor {
g.b.tv = g.sg.TimeValuesGenerator()
g.cur = &g.b
default:
panic("unreachable")
return nil, fmt.Errorf("unsupported field type: %v", g.sg.FieldType())
}

return g.cur
return g.cur, nil
}

func copyTags(dst, src models.Tags) models.Tags {
Expand Down
23 changes: 17 additions & 6 deletions storage/flux/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,10 @@ func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.Result

READ:
for rs.Next() {
cur = rs.Cursor()
cur, err := rs.Cursor()
if err != nil {
return err
}
if cur == nil {
// no data for series key + field combination
continue
Expand All @@ -220,7 +223,7 @@ READ:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
return fmt.Errorf("unexpected cursor type: %T", typedCur)
}

cur = nil
Expand Down Expand Up @@ -306,6 +309,8 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error {
}

func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupResultSet) error {
// error declaration to be used with our cursor iterations below
var err error
// these resources must be closed if not nil on return
var (
gc storage.GroupCursor
Expand All @@ -331,7 +336,10 @@ func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupRe
READ:
for gc != nil {
for gc.Next() {
cur = gc.Cursor()
cur, err = gc.Cursor()
if err != nil {
return err
}
if cur != nil {
break
}
Expand Down Expand Up @@ -363,7 +371,7 @@ READ:
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString, gc.Aggregate(), key)
table = newStringGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
return fmt.Errorf("unexpected cursor type: %T", typedCur)
}

// table owns these resources and is responsible for closing them
Expand Down Expand Up @@ -740,7 +748,10 @@ func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs stor

READ:
for rs.Next() {
cur = rs.Cursor()
cur, err = rs.Cursor()
if err != nil {
return err
}
if cur == nil {
// no data for series key + field combination
continue
Expand Down Expand Up @@ -826,7 +837,7 @@ READ:
table = newStringWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
return fmt.Errorf("unexpected cursor type: %T", typedCur)
}

cur = nil
Expand Down
70 changes: 65 additions & 5 deletions storage/flux/table.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,19 @@ func (t *floatGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "float",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -1954,7 +1966,19 @@ func (t *integerGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "integer",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -2935,7 +2959,19 @@ func (t *unsignedGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "unsigned",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -3860,7 +3896,19 @@ func (t *stringGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "string",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down Expand Up @@ -4785,7 +4833,19 @@ func (t *booleanGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "boolean",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down
14 changes: 13 additions & 1 deletion storage/flux/table.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,19 @@ func (t *{{.name}}GroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
cur, err := t.gc.Cursor()
if err != nil {
cur.Close()
t.err = &errors.Error {
Code: errors.EInvalid,
Err: &GroupCursorError {
typ: "{{.name}}",
cursor: cur,
},
}
return false
}

if cur == nil {
continue
}
Expand Down
11 changes: 8 additions & 3 deletions storage/reads/aggregate_resultset.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
agg := r.req.Aggregate[0]
every := r.req.WindowEvery
offset := r.req.Offset
cursor := r.arrayCursors.createCursor(seriesRow)
cursor, err := r.arrayCursors.createCursor(seriesRow)
// If the createCursor interface method fails, it will
// always return a nil cursor.
if err != nil {
return nil, err
}

var everyDur values.Duration
var offsetDur values.Duration
Expand Down Expand Up @@ -132,8 +137,8 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
}
}

func (r *windowAggregateResultSet) Cursor() cursors.Cursor {
return r.cursor
func (r *windowAggregateResultSet) Cursor() (cursors.Cursor, error) {
return r.cursor, r.err
}

func (r *windowAggregateResultSet) Close() {
Expand Down
7 changes: 5 additions & 2 deletions storage/reads/aggregate_resultset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reads_test

import (
"context"
"github.com/stretchr/testify/require"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -196,7 +197,8 @@ func TestNewWindowAggregateResultSet_Mean(t *testing.T) {
if !resultSet.Next() {
t.Fatalf("unexpected: resultSet could not advance")
}
cursor := resultSet.Cursor()
cursor, err := resultSet.Cursor()
require.NoError(t, err, "create cursor failed")
if cursor == nil {
t.Fatalf("unexpected: cursor was nil")
}
Expand Down Expand Up @@ -238,7 +240,8 @@ func TestNewWindowAggregateResultSet_Months(t *testing.T) {
if !resultSet.Next() {
t.Fatalf("unexpected: resultSet could not advance")
}
cursor := resultSet.Cursor()
cursor, err := resultSet.Cursor()
require.NoError(t, err, "create cursor failed")
if cursor == nil {
t.Fatalf("unexpected: cursor was nil")
}
Expand Down
Loading