Skip to content

Commit bdc430a

Browse files
Sirculardiscordianfish
authored andcommitted
Parallelize stat calls in Linux filesystem collector.
This change adds the ability to process multiple stat calls in parallel. Processing is rate-limited based on the new flag `collector.filesystem.stat-workers` (default 4). Caveat: filesystem stats information is no longer in the same order as returned by `/proc/1/mounts`. This should not be an issue. Caveat: This change currently uses unbuffered channels to prove correctness without reliance on buffers. Buffered channels will yield superior performance. Signed-off-by: Erica Mays <[email protected]>
1 parent 75d951d commit bdc430a

File tree

1 file changed

+88
-56
lines changed

1 file changed

+88
-56
lines changed

collector/filesystem_linux.go

Lines changed: 88 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ const (
4040
var mountTimeout = kingpin.Flag("collector.filesystem.mount-timeout",
4141
"how long to wait for a mount to respond before marking it as stale").
4242
Hidden().Default("5s").Duration()
43+
var statWorkerCount = kingpin.Flag("collector.filesystem.stat-workers",
44+
"how many stat calls to process simultaneously").
45+
Hidden().Default("4").Int()
4346
var stuckMounts = make(map[string]struct{})
4447
var stuckMountsMtx = &sync.Mutex{}
4548

@@ -50,72 +53,101 @@ func (c *filesystemCollector) GetStats() ([]filesystemStats, error) {
5053
return nil, err
5154
}
5255
stats := []filesystemStats{}
53-
for _, labels := range mps {
54-
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
55-
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
56-
continue
57-
}
58-
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
59-
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
60-
continue
61-
}
62-
stuckMountsMtx.Lock()
63-
if _, ok := stuckMounts[labels.mountPoint]; ok {
64-
stats = append(stats, filesystemStats{
65-
labels: labels,
66-
deviceError: 1,
67-
})
68-
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
56+
labelChan := make(chan filesystemLabels)
57+
statChan := make(chan filesystemStats)
58+
wg := sync.WaitGroup{}
59+
60+
workerCount := *statWorkerCount
61+
if workerCount < 1 {
62+
workerCount = 1
63+
}
64+
65+
for i := 0; i < workerCount; i++ {
66+
wg.Add(1)
67+
go func() {
68+
defer wg.Done()
69+
for labels := range labelChan {
70+
statChan <- c.processStat(labels)
71+
}
72+
}()
73+
}
74+
75+
go func() {
76+
for _, labels := range mps {
77+
if c.excludedMountPointsPattern.MatchString(labels.mountPoint) {
78+
level.Debug(c.logger).Log("msg", "Ignoring mount point", "mountpoint", labels.mountPoint)
79+
continue
80+
}
81+
if c.excludedFSTypesPattern.MatchString(labels.fsType) {
82+
level.Debug(c.logger).Log("msg", "Ignoring fs", "type", labels.fsType)
83+
continue
84+
}
85+
86+
stuckMountsMtx.Lock()
87+
if _, ok := stuckMounts[labels.mountPoint]; ok {
88+
stats = append(stats, filesystemStats{
89+
labels: labels,
90+
deviceError: 1,
91+
})
92+
level.Debug(c.logger).Log("msg", "Mount point is in an unresponsive state", "mountpoint", labels.mountPoint)
93+
stuckMountsMtx.Unlock()
94+
continue
95+
}
96+
6997
stuckMountsMtx.Unlock()
70-
continue
98+
labelChan <- labels
7199
}
72-
stuckMountsMtx.Unlock()
100+
close(labelChan)
101+
wg.Wait()
102+
close(statChan)
103+
}()
73104

74-
// The success channel is used do tell the "watcher" that the stat
75-
// finished successfully. The channel is closed on success.
76-
success := make(chan struct{})
77-
go stuckMountWatcher(labels.mountPoint, success, c.logger)
105+
for stat := range statChan {
106+
stats = append(stats, stat)
107+
}
108+
return stats, nil
109+
}
78110

79-
buf := new(unix.Statfs_t)
80-
err = unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
81-
stuckMountsMtx.Lock()
82-
close(success)
83-
// If the mount has been marked as stuck, unmark it and log it's recovery.
84-
if _, ok := stuckMounts[labels.mountPoint]; ok {
85-
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
86-
delete(stuckMounts, labels.mountPoint)
87-
}
88-
stuckMountsMtx.Unlock()
111+
func (c *filesystemCollector) processStat(labels filesystemLabels) filesystemStats {
112+
success := make(chan struct{})
113+
go stuckMountWatcher(labels.mountPoint, success, c.logger)
89114

90-
if err != nil {
91-
stats = append(stats, filesystemStats{
92-
labels: labels,
93-
deviceError: 1,
94-
})
115+
buf := new(unix.Statfs_t)
116+
err := unix.Statfs(rootfsFilePath(labels.mountPoint), buf)
117+
stuckMountsMtx.Lock()
118+
close(success)
95119

96-
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
97-
continue
98-
}
120+
// If the mount has been marked as stuck, unmark it and log it's recovery.
121+
if _, ok := stuckMounts[labels.mountPoint]; ok {
122+
level.Debug(c.logger).Log("msg", "Mount point has recovered, monitoring will resume", "mountpoint", labels.mountPoint)
123+
delete(stuckMounts, labels.mountPoint)
124+
}
125+
stuckMountsMtx.Unlock()
99126

100-
var ro float64
101-
for _, option := range strings.Split(labels.options, ",") {
102-
if option == "ro" {
103-
ro = 1
104-
break
105-
}
127+
if err != nil {
128+
level.Debug(c.logger).Log("msg", "Error on statfs() system call", "rootfs", rootfsFilePath(labels.mountPoint), "err", err)
129+
return filesystemStats{
130+
labels: labels,
131+
deviceError: 1,
106132
}
133+
}
107134

108-
stats = append(stats, filesystemStats{
109-
labels: labels,
110-
size: float64(buf.Blocks) * float64(buf.Bsize),
111-
free: float64(buf.Bfree) * float64(buf.Bsize),
112-
avail: float64(buf.Bavail) * float64(buf.Bsize),
113-
files: float64(buf.Files),
114-
filesFree: float64(buf.Ffree),
115-
ro: ro,
116-
})
135+
var ro float64
136+
for _, option := range strings.Split(labels.options, ",") {
137+
if option == "ro" {
138+
ro = 1
139+
break
140+
}
141+
}
142+
return filesystemStats{
143+
labels: labels,
144+
size: float64(buf.Blocks) * float64(buf.Bsize),
145+
free: float64(buf.Bfree) * float64(buf.Bsize),
146+
avail: float64(buf.Bavail) * float64(buf.Bsize),
147+
files: float64(buf.Files),
148+
filesFree: float64(buf.Ffree),
149+
ro: ro,
117150
}
118-
return stats, nil
119151
}
120152

121153
// stuckMountWatcher listens on the given success channel and if the channel closes

0 commit comments

Comments
 (0)