Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ var (
// ErrMultipleIndexTypes is returned when trying to do deletes on a database with
// multiple index types.
ErrMultipleIndexTypes = errors.New("cannot delete data. DB contains shards using both inmem and tsi1 indexes. Please convert all shards to use the same index type to delete data.")
// ErrNothingToDelete is returned when where is nothing to do for DeleteSeries
// this error is a noop
ErrNothingToDelete = errors.New("nothing to delete")
)

// Statistics gathered by the store.
Expand Down Expand Up @@ -1649,41 +1652,50 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
max = influxql.MaxTime
}

s.mu.RLock()
if s.databases[database].hasMultipleIndexTypes() {
s.mu.RUnlock()
return ErrMultipleIndexTypes
}
sfile := s.sfiles[database]
if sfile == nil {
s.mu.RUnlock()
// No series file means nothing has been written to this DB and thus nothing to delete.
return nil
}

shardFilterFn := byDatabase(database)
if len(sources) != 0 {
var rp string
for idx, source := range sources {
if measurement, ok := source.(*influxql.Measurement); ok {
if idx == 0 {
rp = measurement.RetentionPolicy
} else if rp != measurement.RetentionPolicy {
return fmt.Errorf("mixed retention policies not supported, wanted %q got %q", rp, measurement.RetentionPolicy)
getEpochsAndShards := func() (error, []*Shard, map[uint64]*epochTracker, *SeriesFile) {
s.mu.RLock()
defer s.mu.RUnlock()
if s.databases[database].hasMultipleIndexTypes() {
return ErrMultipleIndexTypes, nil, nil, nil
}
sfile := s.sfiles[database]
if sfile == nil {
// No series file means nothing has been written to this DB and thus nothing to delete.
return ErrNothingToDelete, nil, nil, nil
}

shardFilterFn := byDatabase(database)
if len(sources) != 0 {
var rp string
for idx, source := range sources {
if measurement, ok := source.(*influxql.Measurement); ok {
if idx == 0 {
rp = measurement.RetentionPolicy
} else if rp != measurement.RetentionPolicy {
return fmt.Errorf("mixed retention policies not supported, wanted %q got %q", rp, measurement.RetentionPolicy), nil, nil, nil
}
} else {
return fmt.Errorf("unsupported source type in delete %v", source), nil, nil, nil
}
} else {
return fmt.Errorf("unsupported source type in delete %v", source)
}
}

if rp != "" {
shardFilterFn = ComposeShardFilter(shardFilterFn, byRetentionPolicy(rp))
if rp != "" {
shardFilterFn = ComposeShardFilter(shardFilterFn, byRetentionPolicy(rp))
}
}

shards := s.filterShards(shardFilterFn)
epochs := s.epochsForShards(shards)
return nil, shards, epochs, sfile
}
shards := s.filterShards(shardFilterFn)

epochs := s.epochsForShards(shards)
s.mu.RUnlock()
err, shards, epochs, sfile := getEpochsAndShards()
if err != nil && !errors.Is(err, ErrNothingToDelete) {
s.Logger.Error("DeleteSeries failed", zap.String("error", err.Error()))
return err
} else if errors.Is(err, ErrNothingToDelete) {
return nil
}

// Limit deletes for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
Expand Down
79 changes: 79 additions & 0 deletions tsdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2978,3 +2978,82 @@ func (m *mockStartupLogger) Tracked() []string {
copy(tracked, m._shardTracker)
return tracked
}

// TestStore_DeleteSeries_Deadlock tests the complete lock contention scenario
// where leaked read locks from DeleteSeries block write operations like CreateShard
func TestStore_DeleteSeries_Deadlock(t *testing.T) {
test := func(index string) {
s := MustOpenStore(t, index)
defer s.CloseStore(t, index)

err := s.CreateShard("db0", "rp0", 1, true)
require.NoError(t, err, "Create shard failure")

mixedSources := []influxql.Source{
&influxql.Measurement{Name: "measurement1", RetentionPolicy: "rp1"},
&influxql.Measurement{Name: "measurement2", RetentionPolicy: "rp2"},
}

leakCount := 10
for i := 0; i < leakCount; i++ {
err := s.DeleteSeries("db0", mixedSources, nil)
require.Contains(t, err.Error(), "mixed retention policies not supported")
}

results := make(chan string, 20)
startSignal := make(chan struct{})

writeOpsCount := 5
for i := 0; i < writeOpsCount; i++ {
go func(id int) {
<-startSignal

// Try CreateShard this needs s.mu.Lock() and should be blocked by RLocks
start := time.Now()
err := s.CreateShard("db0", "rp0", uint64(100+id), true)
duration := time.Since(start)

if err != nil {
results <- fmt.Sprintf("CreateShard[%d]: ERROR after %v: %v", id, duration, err)
} else {
results <- fmt.Sprintf("CreateShard[%d]: OK (%v)", id, duration)
}
}(i)
}

readOpsCount := 10
for i := 0; i < readOpsCount; i++ {
go func(id int) {
<-startSignal

// Try Shard() this needs s.mu.RLock()
start := time.Now()
shard := s.Shard(1)
duration := time.Since(start)

if shard == nil {
results <- fmt.Sprintf("Shard[%d]: ERROR after %v: shard not found", id, duration)
} else {
results <- fmt.Sprintf("Shard[%d]: OK (%v)", id, duration)
}
}(i)
}

// Start all operations simultaneously to create maximum contention
close(startSignal)

timeout := time.After(time.Second * 5)

for i := 0; i < writeOpsCount+readOpsCount; i++ {
select {
case <-results:
case <-timeout:
t.Fatal("Operation timed out - indicating severe lock contention")
}
}
}

for _, indexType := range tsdb.RegisteredIndexes() {
t.Run(indexType, func(t *testing.T) { test(indexType) })
}
}