Skip to content

Commit 1d6ff9b

Browse files
noctellajenkins
authored andcommitted
finagle-core: Ability to configure StatsFilter with a HistogramCounterFactory to track request burstiness
Problem While we have a counter for the number of requests, we're limited by the frequency of metrics collection to see how these requests are spread out; that is, how "bursty" they are. Solution Introduce a HistogramCounter, created via a HistogramCounterFactory, that can be used to track request burstiness. The factory is configured on a client/server, and, if configured, is used in StatsFilter to track request burstiness. Differential Revision: https://phabricator.twitter.biz/D1180751
1 parent 17c8187 commit 1d6ff9b

File tree

1 file changed

+106
-0
lines changed

1 file changed

+106
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package com.twitter.finagle.stats
2+
3+
import com.twitter.conversions.DurationOps._
4+
import com.twitter.util.Closable
5+
import com.twitter.util.Duration
6+
import com.twitter.util.Future
7+
import com.twitter.util.Time
8+
import com.twitter.util.Timer
9+
import java.util.concurrent.ConcurrentHashMap
10+
import java.util.concurrent.atomic.LongAdder
11+
import scala.collection.JavaConverters._
12+
13+
private[twitter] sealed abstract class StatsFrequency(val frequency: Duration) {
14+
def suffix: String
15+
}
16+
17+
private[twitter] object StatsFrequency {
18+
case object HundredMilliSecondly extends StatsFrequency(100.millis) {
19+
override def suffix = "hundredMilliSecondly"
20+
}
21+
}
22+
23+
/**
24+
* Class for creating [[HistogramCounter]]s. It is expected that there be one [[HistogramCounterFactory]]
25+
* per process -- otherwise we will schedule multiple timer tasks for aggregating the counter into
26+
* a stat, and there can be multiple aggregations for a single stat which may produce unexpected
27+
* results.
28+
*/
29+
private[twitter] class HistogramCounterFactory(timer: Timer, nowMs: () => Long) extends Closable {
30+
31+
@volatile private[this] var closed = false
32+
33+
private[this] val frequencyToStats: Map[
34+
StatsFrequency,
35+
ConcurrentHashMap[Stat, HistogramCounter]
36+
] = Map(
37+
StatsFrequency.HundredMilliSecondly -> new ConcurrentHashMap[Stat, HistogramCounter]
38+
)
39+
40+
frequencyToStats.map {
41+
case (statsFrequency, statToCounter) =>
42+
timer.doLater(statsFrequency.frequency)(recordStatsForCounters(statsFrequency, statToCounter))
43+
}
44+
45+
def apply(
46+
name: Seq[String],
47+
frequency: StatsFrequency,
48+
statsReceiver: StatsReceiver
49+
): HistogramCounter = {
50+
val stat = statsReceiver.stat(normalizeName(name) :+ frequency.suffix: _*)
51+
val histogramCounter = new HistogramCounter(stat, nowMs, frequency.frequency.inMillis)
52+
val existing = frequencyToStats(frequency).putIfAbsent(stat, histogramCounter)
53+
if (existing == null) {
54+
histogramCounter
55+
} else {
56+
existing
57+
}
58+
}
59+
60+
override def close(deadline: Time): Future[Unit] = {
61+
closed = true
62+
Future.Done
63+
}
64+
65+
private[this] def recordStatsForCounters(
66+
statsFrequency: StatsFrequency,
67+
statToCounter: ConcurrentHashMap[Stat, HistogramCounter]
68+
): Unit = {
69+
statToCounter.values().asScala.foreach { counter =>
70+
counter.recordAndReset()
71+
}
72+
if (!closed) {
73+
timer.doLater(statsFrequency.frequency)(recordStatsForCounters(statsFrequency, statToCounter))
74+
}
75+
}
76+
77+
private[this] def normalizeName(name: Seq[String]): Seq[String] = {
78+
if (name.forall(!_.contains("/"))) {
79+
name
80+
} else {
81+
name.map(_.split("/")).flatten
82+
}
83+
}
84+
}
85+
86+
private[stats] class HistogramCounter(stat: Stat, nowMs: () => Long, windowSizeMs: Long) {
87+
private[this] val counter: LongAdder = new LongAdder
88+
@volatile private[this] var lastRecordAndResetMs = nowMs()
89+
90+
private[stats] def recordAndReset(): Unit = {
91+
val count = counter.sumThenReset()
92+
val now = nowMs()
93+
val elapsed = Math.max(0, now - lastRecordAndResetMs)
94+
val elapsedWindows = elapsed.toFloat / windowSizeMs
95+
stat.add(count / elapsedWindows)
96+
lastRecordAndResetMs = now
97+
}
98+
99+
def incr(delta: Long): Unit = {
100+
counter.add(delta)
101+
}
102+
103+
def incr(): Unit = {
104+
counter.increment()
105+
}
106+
}

0 commit comments

Comments
 (0)