Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugins/inputs/chrony/chrony.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os/user"
"path"
"strconv"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -40,6 +41,11 @@ type Chrony struct {
client *fbchrony.Client
source string
local string

// clientMu protects concurrent access to the chrony client.
// This prevents race conditions when multiple Gather() calls
// access the same client instance concurrently.
clientMu sync.Mutex
}

func (*Chrony) SampleConfig() string {
Expand Down Expand Up @@ -146,6 +152,12 @@ func (c *Chrony) Start(_ telegraf.Accumulator) error {
}

func (c *Chrony) Gather(acc telegraf.Accumulator) error {
// Protect concurrent access to the chrony client to prevent race conditions
// when multiple Gather() calls occur simultaneously (e.g., when a previous
// gather hasn't completed before the next interval triggers).
c.clientMu.Lock()
defer c.clientMu.Unlock()

for _, m := range c.Metrics {
switch m {
case "activity":
Expand Down
226 changes: 226 additions & 0 deletions plugins/inputs/chrony/chrony_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"math"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1124,3 +1125,228 @@ func encodeFloat(x float64) uint32 {

return uint32(exp<<floatCoeffBits) | uint32(coef)
}

// TestConcurrentGather verifies that concurrent Gather() calls don't cause
// a race condition or panic when accessing the shared chrony client.
// This test addresses the issue reported in GitHub issue #17757 where
// concurrent access to the client caused "index out of range [256] with length 256" panics.
func TestConcurrentGather(t *testing.T) {
// Setup a mock server with multiple sources to ensure longer gather time
server := Server{
ActivityInfo: &fbchrony.Activity{
Online: 10,
Offline: 2,
BurstOnline: 1,
BurstOffline: 0,
Unresolved: 3,
},
TrackingInfo: &fbchrony.Tracking{
RefID: 0xA29FC87B,
IPAddr: net.ParseIP("192.168.1.22"),
Stratum: 3,
LeapStatus: 0,
RefTime: time.Now(),
CurrentCorrection: 0.000020390,
LastOffset: 0.000012651,
RMSOffset: 0.000025577,
FreqPPM: -16.001,
ResidFreqPPM: 0.0,
SkewPPM: 0.006,
RootDelay: 0.001655,
RootDispersion: 0.003307,
LastUpdateInterval: 507.2,
},
ServerStatInfo: &fbchrony.ServerStats{
NTPHits: 2542,
CMDHits: 112,
NTPDrops: 42,
CMDDrops: 8,
LogDrops: 0,
},
SourcesInfo: []source{
{
name: "ntp1.example.com",
data: &fbchrony.SourceData{
IPAddr: net.IPv4(192, 168, 1, 1),
Poll: 64,
Stratum: 2,
State: fbchrony.SourceStateSync,
Mode: fbchrony.SourceModePeer,
Flags: 0,
Reachability: 255,
SinceSample: 0,
OrigLatestMeas: 0.001,
LatestMeas: 0.001,
LatestMeasErr: 0.0001,
},
stats: &fbchrony.SourceStats{
RefID: 434354566,
IPAddr: net.IPv4(192, 168, 1, 1),
NSamples: 100,
NRuns: 10,
SpanSeconds: 1000,
StandardDeviation: 0.001,
ResidFreqPPM: 0.0001,
SkewPPM: 0.0001,
EstimatedOffset: 0.0001,
EstimatedOffsetErr: 0.00001,
},
},
{
name: "ntp2.example.com",
data: &fbchrony.SourceData{
IPAddr: net.IPv4(192, 168, 1, 2),
Poll: 64,
Stratum: 2,
State: fbchrony.SourceStateSync,
Mode: fbchrony.SourceModePeer,
Flags: 0,
Reachability: 255,
SinceSample: 0,
OrigLatestMeas: 0.002,
LatestMeas: 0.002,
LatestMeasErr: 0.0002,
},
stats: &fbchrony.SourceStats{
RefID: 434354567,
IPAddr: net.IPv4(192, 168, 1, 2),
NSamples: 100,
NRuns: 10,
SpanSeconds: 1000,
StandardDeviation: 0.002,
ResidFreqPPM: 0.0002,
SkewPPM: 0.0002,
EstimatedOffset: 0.0002,
EstimatedOffsetErr: 0.00002,
},
},
{
name: "ntp3.example.com",
data: &fbchrony.SourceData{
IPAddr: net.IPv4(192, 168, 1, 3),
Poll: 64,
Stratum: 2,
State: fbchrony.SourceStateSync,
Mode: fbchrony.SourceModePeer,
Flags: 0,
Reachability: 255,
SinceSample: 0,
OrigLatestMeas: 0.003,
LatestMeas: 0.003,
LatestMeasErr: 0.0003,
},
stats: &fbchrony.SourceStats{
RefID: 434354568,
IPAddr: net.IPv4(192, 168, 1, 3),
NSamples: 100,
NRuns: 10,
SpanSeconds: 1000,
StandardDeviation: 0.003,
ResidFreqPPM: 0.0003,
SkewPPM: 0.0003,
EstimatedOffset: 0.0003,
EstimatedOffsetErr: 0.00003,
},
},
},
}
addr, err := server.Listen(t)
require.NoError(t, err)
defer server.Shutdown()

// Setup the plugin with all metrics enabled to maximize the gather time
// and increase the likelihood of concurrent access
plugin := &Chrony{
Server: "udp://" + addr,
Metrics: []string{"activity", "tracking", "serverstats", "sources", "sourcestats"},
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(nil))
defer plugin.Stop()

// Run multiple concurrent Gather() calls
// This simulates what happens when a previous gather hasn't completed
// before the next interval triggers
const numConcurrent = 10
var wg sync.WaitGroup
errors := make(chan error, numConcurrent)

for i := 0; i < numConcurrent; i++ {
wg.Add(1)
go func(iteration int) {
defer wg.Done()

var acc testutil.Accumulator
if err := plugin.Gather(&acc); err != nil {
errors <- err
return
}

// Verify we got metrics (no panic occurred)
if len(acc.GetTelegrafMetrics()) == 0 {
t.Logf("iteration %d: no metrics collected", iteration)
}
}(i)
}

// Wait for all goroutines to complete
wg.Wait()
close(errors)

// Check if any errors occurred
for err := range errors {
require.NoError(t, err, "concurrent gather should not produce errors")
}
}

// TestRaceDetector runs the test with the Go race detector enabled.
func TestRaceDetector(t *testing.T) {
// Setup a minimal mock server
server := Server{
ServerStatInfo: &fbchrony.ServerStats{
NTPHits: 100,
CMDHits: 10,
NTPDrops: 1,
CMDDrops: 0,
LogDrops: 0,
},
}
addr, err := server.Listen(t)
require.NoError(t, err)
defer server.Shutdown()

plugin := &Chrony{
Server: "udp://" + addr,
Metrics: []string{"serverstats"},
Log: testutil.Logger{},
}
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Start(nil))
defer plugin.Stop()

// Run 100 concurrent gathers to give the race detector
// a better chance of catching any race conditions
const iterations = 100
var wg sync.WaitGroup
errors := make(chan error, iterations)

for i := 0; i < iterations; i++ {
wg.Add(1)
go func() {
defer wg.Done()
var acc testutil.Accumulator
if err := plugin.Gather(&acc); err != nil {
errors <- err
}
}()
}

wg.Wait()
close(errors)

// Check if any errors occurred
for err := range errors {
require.NoError(t, err)
}
}
Loading