1
1
package io .quarkus .smallrye .health .runtime .dev .ui ;
2
2
3
3
import java .time .Duration ;
4
- import java .util .Objects ;
4
+ import java .util .Set ;
5
+ import java .util .concurrent .ConcurrentHashMap ;
6
+ import java .util .concurrent .atomic .AtomicInteger ;
5
7
import java .util .concurrent .atomic .AtomicReference ;
6
8
7
- import jakarta .annotation .PostConstruct ;
8
9
import jakarta .inject .Inject ;
9
10
import jakarta .json .Json ;
10
11
import jakarta .json .JsonObject ;
13
14
import io .smallrye .health .SmallRyeHealthReporter ;
14
15
import io .smallrye .mutiny .Multi ;
15
16
import io .smallrye .mutiny .Uni ;
16
- import io .smallrye .mutiny .operators .multi .processors .BroadcastProcessor ;
17
+ import io .smallrye .mutiny .subscription .BackPressureStrategy ;
18
+ import io .smallrye .mutiny .subscription .Cancellable ;
19
+ import io .smallrye .mutiny .subscription .MultiEmitter ;
17
20
18
21
public class HealthJsonRPCService {
19
22
20
23
@ Inject
21
24
SmallRyeHealthReporter smallRyeHealthReporter ;
22
25
23
- private final BroadcastProcessor <SmallRyeHealth > healthStream = BroadcastProcessor .create ();
24
- private final BroadcastProcessor <String > statusStream = BroadcastProcessor .create ();
25
- private final AtomicReference <String > lastPayload = new AtomicReference <>("" );
26
-
27
- @ PostConstruct
28
- void startPolling () {
29
- Multi .createFrom ().ticks ().every (Duration .ofSeconds (3 ))
30
- .onItem ().transformToUniAndMerge (tick -> smallRyeHealthReporter .getHealthAsync ())
31
- .subscribe ().with (smallRyeHealth -> {
32
- String jsonStr = smallRyeHealth .getPayload ().toString ();
33
- if (!Objects .equals (lastPayload .getAndSet (jsonStr ), jsonStr )) {
34
- if (smallRyeHealth != null ) {
35
- healthStream .onNext (smallRyeHealth );
36
- statusStream .onNext (getStatusIcon (smallRyeHealth ));
26
+ private final Set <MultiEmitter <? super SmallRyeHealth >> healthEmitters = ConcurrentHashMap .newKeySet ();
27
+ private final Set <MultiEmitter <? super String >> statusEmitters = ConcurrentHashMap .newKeySet ();
28
+
29
+ private final AtomicInteger activeSubscribers = new AtomicInteger (0 );
30
+ private final AtomicReference <SmallRyeHealth > latest = new AtomicReference <>();
31
+ private volatile Cancellable pollingCancellable ;
32
+
33
+ private synchronized void startPollingIfNeeded (int interval ) {
34
+ if (pollingCancellable == null ) {
35
+ pollingCancellable = Multi .createFrom ().ticks ().every (Duration .ofSeconds (interval ))
36
+ .onItem ().transformToUniAndMerge (tick -> smallRyeHealthReporter .getHealthAsync ())
37
+ .subscribe ().with (smallRyeHealth -> {
38
+ latest .set (smallRyeHealth );
39
+ for (var emitter : healthEmitters ) {
40
+ emitter .emit (smallRyeHealth );
41
+ }
42
+ for (var emitter : statusEmitters ) {
43
+ emitter .emit (getStatusIcon (smallRyeHealth ));
44
+ }
45
+ }, failure -> {
46
+ JsonObject errorPayload = Json .createObjectBuilder ()
47
+ .add ("status" , "DOWN" )
48
+ .add ("checks" , Json .createArrayBuilder ()
49
+ .add (Json .createObjectBuilder ()
50
+ .add ("name" , "Smallrye Health stream" )
51
+ .add ("status" , "DOWN" )
52
+ .add ("data" , Json .createObjectBuilder ()
53
+ .add ("reason" , failure .getMessage ()))))
54
+ .build ();
55
+ SmallRyeHealth errorHealth = new SmallRyeHealth (errorPayload );
56
+ latest .set (errorHealth );
57
+ for (var emitter : healthEmitters ) {
58
+ emitter .emit (errorHealth );
37
59
}
38
- }
39
- }, failure -> {
40
- JsonObject errorPayload = Json .createObjectBuilder ()
41
- .add ("status" , "DOWN" )
42
- .add ("checks" , Json .createArrayBuilder ()
43
- .add (Json .createObjectBuilder ()
44
- .add ("name" , "Smallrye Health stream" )
45
- .add ("status" , "DOWN" )
46
- .add ("data" , Json .createObjectBuilder ()
47
- .add ("reason" , failure .getMessage ()))))
48
- .build ();
49
- healthStream .onNext (new SmallRyeHealth (errorPayload ));
50
- statusStream .onNext (DOWN_ICON );
51
- });
60
+ for (var emitter : statusEmitters ) {
61
+ emitter .emit (getStatusIcon (errorHealth ));
62
+ }
63
+ });
64
+ }
65
+ }
66
+
67
+ private synchronized void stopPolling () {
68
+ if (pollingCancellable != null ) {
69
+ pollingCancellable .cancel ();
70
+ pollingCancellable = null ;
71
+ latest .set (null );
72
+ }
73
+ }
74
+
75
+ private synchronized void restartPolling (int interval ) {
76
+ stopPolling ();
77
+ if (interval > 0 ) {
78
+ startPollingIfNeeded (interval );
79
+ }
52
80
}
53
81
54
82
public Uni <SmallRyeHealth > getHealth () {
55
83
return smallRyeHealthReporter .getHealthAsync ();
56
84
}
57
85
58
- public Multi <SmallRyeHealth > streamHealth () {
59
- return healthStream ;
86
+ public Multi <SmallRyeHealth > streamHealth (String interval ) {
87
+ int iv = getIntervalValue (interval );
88
+
89
+ return Multi .createFrom ().emitter (emitter -> {
90
+ activeSubscribers .incrementAndGet ();
91
+ healthEmitters .add (emitter );
92
+
93
+ SmallRyeHealth current = latest .get ();
94
+ if (current != null ) {
95
+ emitter .emit (current );
96
+ }
97
+
98
+ restartPolling (iv );
99
+
100
+ emitter .onTermination (() -> {
101
+ healthEmitters .remove (emitter );
102
+ if (activeSubscribers .decrementAndGet () == 0 ) {
103
+ stopPolling ();
104
+ }
105
+ });
106
+ }, BackPressureStrategy .LATEST );
60
107
}
61
108
62
109
public String getStatus () {
63
110
return getStatusIcon (smallRyeHealthReporter .getHealth ());
64
111
}
65
112
66
- public Multi <String > streamStatus () {
67
- return statusStream ;
113
+ public Multi <String > streamStatus (String interval ) {
114
+ int iv = getIntervalValue (interval );
115
+
116
+ return Multi .createFrom ().emitter (emitter -> {
117
+ activeSubscribers .incrementAndGet ();
118
+ statusEmitters .add (emitter );
119
+
120
+ SmallRyeHealth current = latest .get ();
121
+ if (current != null ) {
122
+ emitter .emit (getStatusIcon (current ));
123
+ }
124
+
125
+ restartPolling (iv );
126
+
127
+ emitter .onTermination (() -> {
128
+ statusEmitters .remove (emitter );
129
+ if (activeSubscribers .decrementAndGet () == 0 ) {
130
+ stopPolling ();
131
+ }
132
+ });
133
+ }, BackPressureStrategy .LATEST );
68
134
}
69
135
70
136
private String getStatusIcon (SmallRyeHealth smallRyeHealth ) {
@@ -77,6 +143,17 @@ private String getStatusIcon(SmallRyeHealth smallRyeHealth) {
77
143
return DOWN_ICON ;
78
144
}
79
145
146
+ private int getIntervalValue (String interval ) {
147
+ if (interval == null || interval .isBlank ()) {
148
+ interval = "10s" ; //default
149
+ }
150
+ if (interval .equalsIgnoreCase ("Off" )) {
151
+ return -1 ;
152
+ }
153
+
154
+ return Integer .parseInt (interval .substring (0 , interval .length () - 1 ));
155
+ }
156
+
80
157
private static final String UP_ICON = "<vaadin-icon style='color:var(--lumo-success-text-color);' icon='font-awesome-solid:thumbs-up'></vaadin-icon>" ;
81
158
private static final String DOWN_ICON = "<vaadin-icon style='color:var(--lumo-error-text-color);' icon='font-awesome-solid:thumbs-down'></vaadin-icon>" ;
82
159
}
0 commit comments