Skip to content

Commit fa369fa

Browse files
committed
NOTIF-512 Notify org admins when an endpoint is disabled
1 parent 2d9fa3c commit fa369fa

File tree

12 files changed

+718
-38
lines changed

12 files changed

+718
-38
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.redhat.cloud.notifications.events;
2+
3+
import com.redhat.cloud.notifications.ingress.Action;
4+
import com.redhat.cloud.notifications.ingress.Event;
5+
import com.redhat.cloud.notifications.ingress.Parser;
6+
import com.redhat.cloud.notifications.ingress.Payload;
7+
import com.redhat.cloud.notifications.ingress.Recipient;
8+
import com.redhat.cloud.notifications.models.Endpoint;
9+
import org.eclipse.microprofile.reactive.messaging.Channel;
10+
import org.eclipse.microprofile.reactive.messaging.Emitter;
11+
import org.eclipse.microprofile.reactive.messaging.Message;
12+
13+
import javax.enterprise.context.ApplicationScoped;
14+
import java.time.LocalDateTime;
15+
import java.util.List;
16+
import java.util.UUID;
17+
18+
import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.EGRESS_CHANNEL;
19+
import static java.time.ZoneOffset.UTC;
20+
21+
@ApplicationScoped
22+
public class DisabledWebhookNotifier {
23+
24+
public static final String DISABLED_WEBHOOK_EVENT_TYPE = "disabled-webhook";
25+
public static final String CLIENT_ERROR_TYPE = "client";
26+
public static final String SERVER_ERROR_TYPE = "server";
27+
28+
@Channel(EGRESS_CHANNEL)
29+
Emitter<String> emitter;
30+
31+
public void clientError(Endpoint endpoint, int statusCode) {
32+
notifyEndpointDisabled(endpoint, CLIENT_ERROR_TYPE, statusCode, 1);
33+
}
34+
35+
public void tooManyServerErrors(Endpoint endpoint, int errorsCount) {
36+
notifyEndpointDisabled(endpoint, SERVER_ERROR_TYPE, -1, errorsCount);
37+
}
38+
39+
private void notifyEndpointDisabled(Endpoint endpoint, String errorType, int statusCode, int errorsCount) {
40+
Payload payload = new Payload.PayloadBuilder()
41+
.withAdditionalProperty("endpoint-id", endpoint.getId())
42+
.withAdditionalProperty("endpoint-description", endpoint.getDescription())
43+
.withAdditionalProperty("error-type", errorType)
44+
.withAdditionalProperty("errors-count", errorsCount)
45+
.build();
46+
47+
Event event = new Event.EventBuilder()
48+
.withPayload(payload)
49+
.build();
50+
51+
Recipient recipients = new Recipient.RecipientBuilder()
52+
.withOnlyAdmins(true)
53+
.withIgnoreUserPreferences(true)
54+
.build();
55+
56+
Action action = new Action.ActionBuilder()
57+
.withId(UUID.randomUUID())
58+
.withBundle("console")
59+
.withApplication("notifications")
60+
.withEventType(DISABLED_WEBHOOK_EVENT_TYPE)
61+
.withOrgId(endpoint.getOrgId())
62+
.withTimestamp(LocalDateTime.now(UTC))
63+
.withEvents(List.of(event))
64+
.withRecipients(List.of(recipients))
65+
.build();
66+
String encodedAction = Parser.encode(action);
67+
68+
Message<String> message = KafkaMessageWithIdBuilder.build(encodedAction);
69+
emitter.send(message);
70+
}
71+
}

engine/src/main/java/com/redhat/cloud/notifications/events/FromCamelHistoryFiller.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
import io.micrometer.core.instrument.MeterRegistry;
1515
import io.quarkus.logging.Log;
1616
import io.smallrye.reactive.messaging.annotations.Blocking;
17-
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
1817
import io.vertx.core.json.Json;
19-
import org.apache.kafka.common.header.internals.RecordHeaders;
2018
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
2119
import org.eclipse.microprofile.reactive.messaging.Channel;
2220
import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -32,9 +30,6 @@
3230
import java.util.Map;
3331
import java.util.UUID;
3432

35-
import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER;
36-
import static java.nio.charset.StandardCharsets.UTF_8;
37-
3833
/**
3934
* We sent data via Camel. Now Camel informs us about the outcome,
4035
* which we need to put into the notifications history.
@@ -46,6 +41,7 @@ public class FromCamelHistoryFiller {
4641
public static final String MESSAGES_ERROR_COUNTER_NAME = "camel.messages.error";
4742
public static final String MESSAGES_PROCESSED_COUNTER_NAME = "camel.messages.processed";
4843
public static final String EGRESS_CHANNEL = "egress";
44+
public static final String INTEGRATION_FAILED_EVENT_TYPE = "integration-failed";
4945

5046
@Inject
5147
NotificationHistoryRepository notificationHistoryRepository;
@@ -122,7 +118,7 @@ private void reinjectIfNeeded(Map<String, Object> payloadMap) {
122118
.withId(UUID.randomUUID())
123119
.withBundle("console")
124120
.withApplication("notifications")
125-
.withEventType("integration-failed")
121+
.withEventType(INTEGRATION_FAILED_EVENT_TYPE)
126122
.withAccountId(ep != null ? ep.getAccountId() : "")
127123
.withOrgId(ep != null && ep.getOrgId() != null ? ep.getOrgId() : "")
128124
.withContext(contextBuilder.build())
@@ -137,22 +133,10 @@ private void reinjectIfNeeded(Map<String, Object> payloadMap) {
137133

138134
String ser = Parser.encode(action);
139135
// Add the message id in Kafka header for the de-duplicator
140-
Message<String> message = buildMessageWithId(ser);
136+
Message<String> message = KafkaMessageWithIdBuilder.build(ser);
141137
emitter.send(message);
142138
}
143139

144-
145-
// Blindly copied from -gw. Perhaps put this into Schema project
146-
private static Message buildMessageWithId(String payload) {
147-
byte[] messageId = UUID.randomUUID().toString().getBytes(UTF_8);
148-
OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.builder()
149-
.withHeaders(new RecordHeaders().add(MESSAGE_ID_HEADER, messageId))
150-
.build();
151-
return Message.of(payload).addMetadata(metadata);
152-
}
153-
154-
155-
156140
private Map<String, Object> decodeItem(String s) {
157141

158142
// 1st step CloudEvent as String -> map
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.redhat.cloud.notifications.events;
2+
3+
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
4+
import org.apache.kafka.common.header.internals.RecordHeaders;
5+
import org.eclipse.microprofile.reactive.messaging.Message;
6+
7+
import java.util.UUID;
8+
9+
import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER;
10+
import static java.nio.charset.StandardCharsets.UTF_8;
11+
12+
public class KafkaMessageWithIdBuilder {
13+
14+
public static Message build(String payload) {
15+
byte[] messageId = UUID.randomUUID().toString().getBytes(UTF_8);
16+
OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.builder()
17+
.withHeaders(new RecordHeaders().add(MESSAGE_ID_HEADER, messageId))
18+
.build();
19+
return Message.of(payload).addMetadata(metadata);
20+
}
21+
}

engine/src/main/java/com/redhat/cloud/notifications/processors/webhooks/WebhookTypeProcessor.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.redhat.cloud.notifications.config.FeatureFlipper;
44
import com.redhat.cloud.notifications.db.repositories.EndpointRepository;
5+
import com.redhat.cloud.notifications.events.DisabledWebhookNotifier;
56
import com.redhat.cloud.notifications.models.Endpoint;
67
import com.redhat.cloud.notifications.models.Event;
78
import com.redhat.cloud.notifications.models.Notification;
@@ -42,7 +43,7 @@
4243
@ApplicationScoped
4344
public class WebhookTypeProcessor implements EndpointTypeProcessor {
4445

45-
public static final String DISABLED_ENDPOINTS_COUNTER = "processor.webhook.disabled.endpoints";
46+
public static final String DISABLED_WEBHOOKS_COUNTER = "processor.webhook.disabled.endpoints";
4647
public static final String ERROR_TYPE_TAG_KEY = "error_type";
4748
public static final String CLIENT_TAG_VALUE = "client";
4849
public static final String SERVER_TAG_VALUE = "server";
@@ -82,19 +83,22 @@ public class WebhookTypeProcessor implements EndpointTypeProcessor {
8283
@Inject
8384
EndpointRepository endpointRepository;
8485

86+
@Inject
87+
DisabledWebhookNotifier disabledWebhookNotifier;
88+
8589
@Inject
8690
MeterRegistry registry;
8791

8892
private Counter processedCount;
89-
private Counter disabledEndpointsClientErrorCount;
90-
private Counter disabledEndpointsServerErrorCount;
93+
private Counter disabledWebhooksClientErrorCount;
94+
private Counter disabledWebhooksServerErrorCount;
9195
private RetryPolicy<Object> retryPolicy;
9296

9397
@PostConstruct
9498
void postConstruct() {
9599
processedCount = registry.counter("processor.webhook.processed");
96-
disabledEndpointsClientErrorCount = registry.counter(DISABLED_ENDPOINTS_COUNTER, ERROR_TYPE_TAG_KEY, CLIENT_TAG_VALUE);
97-
disabledEndpointsServerErrorCount = registry.counter(DISABLED_ENDPOINTS_COUNTER, ERROR_TYPE_TAG_KEY, SERVER_TAG_VALUE);
100+
disabledWebhooksClientErrorCount = registry.counter(DISABLED_WEBHOOKS_COUNTER, ERROR_TYPE_TAG_KEY, CLIENT_TAG_VALUE);
101+
disabledWebhooksServerErrorCount = registry.counter(DISABLED_WEBHOOKS_COUNTER, ERROR_TYPE_TAG_KEY, SERVER_TAG_VALUE);
98102
retryPolicy = RetryPolicy.builder()
99103
.handleIf(this::shouldRetry)
100104
.withBackoff(initialRetryBackOff, maxRetryBackOff)
@@ -176,9 +180,9 @@ public NotificationHistory doHttpRequest(Notification item, HttpRequest<Buffer>
176180
*/
177181
boolean disabled = endpointRepository.incrementEndpointServerErrors(item.getEndpoint().getId(), maxServerErrors);
178182
if (disabled) {
179-
disabledEndpointsServerErrorCount.increment();
183+
disabledWebhooksServerErrorCount.increment();
180184
Log.infof("Endpoint %s was disabled because we received too many 5xx status while calling it", item.getEndpoint().getId());
181-
// TODO NOTIF-512 Send a notification to the org admin explaining the situation.
185+
disabledWebhookNotifier.tooManyServerErrors(item.getEndpoint(), maxServerErrors);
182186
}
183187
}
184188
}
@@ -196,9 +200,9 @@ public NotificationHistory doHttpRequest(Notification item, HttpRequest<Buffer>
196200
*/
197201
boolean disabled = endpointRepository.disableEndpoint(item.getEndpoint().getId());
198202
if (disabled) {
199-
disabledEndpointsClientErrorCount.increment();
203+
disabledWebhooksClientErrorCount.increment();
200204
Log.infof("Endpoint %s was disabled because we received a 4xx status while calling it", item.getEndpoint().getId());
201-
// TODO NOTIF-512 Send a notification to the org admin explaining the situation.
205+
disabledWebhookNotifier.clientError(item.getEndpoint(), resp.statusCode());
202206
}
203207
} else {
204208
/*

engine/src/main/java/com/redhat/cloud/notifications/templates/ConsoleNotifications.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,51 @@
44
import io.quarkus.qute.CheckedTemplate;
55
import io.quarkus.qute.TemplateInstance;
66

7+
import static com.redhat.cloud.notifications.events.DisabledWebhookNotifier.DISABLED_WEBHOOK_EVENT_TYPE;
8+
import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.INTEGRATION_FAILED_EVENT_TYPE;
9+
import static com.redhat.cloud.notifications.models.EmailSubscriptionType.INSTANT;
10+
711
// Class name is the folder name in resources/templates/
812
public class ConsoleNotifications implements EmailTemplate {
13+
14+
private static final String NO_TITLE_FOUND_MSG = "No email title template found for ConsoleNotifications event_type: %s";
15+
private static final String NO_BODY_FOUND_MSG = "No email body template found for ConsoleNotifications event_type: %s";
16+
917
@Override
1018
public TemplateInstance getTitle(String eventType, EmailSubscriptionType type) {
11-
return Templates.failedIntegrationTitle();
19+
if (type == INSTANT) {
20+
switch (eventType) {
21+
case INTEGRATION_FAILED_EVENT_TYPE:
22+
return Templates.failedIntegrationTitle();
23+
case DISABLED_WEBHOOK_EVENT_TYPE:
24+
return Templates.disabledWebhookTitle();
25+
default:
26+
// Do nothing.
27+
break;
28+
}
29+
}
30+
throw new UnsupportedOperationException(String.format(NO_TITLE_FOUND_MSG, eventType));
1231
}
1332

1433
@Override
1534
public TemplateInstance getBody(String eventType, EmailSubscriptionType type) {
16-
return Templates.failedIntegrationBody();
35+
if (type == INSTANT) {
36+
switch (eventType) {
37+
case INTEGRATION_FAILED_EVENT_TYPE:
38+
return Templates.failedIntegrationBody();
39+
case DISABLED_WEBHOOK_EVENT_TYPE:
40+
return Templates.disabledWebhookBody();
41+
default:
42+
// Do nothing.
43+
break;
44+
}
45+
}
46+
throw new UnsupportedOperationException(String.format(NO_BODY_FOUND_MSG, eventType));
1747
}
1848

1949
@Override
2050
public boolean isSupported(String eventType, EmailSubscriptionType type) {
21-
return type.equals(EmailSubscriptionType.INSTANT);
51+
return type.equals(INSTANT);
2252
}
2353

2454
@Override
@@ -33,5 +63,8 @@ public static class Templates {
3363

3464
public static native TemplateInstance failedIntegrationBody();
3565

66+
public static native TemplateInstance disabledWebhookTitle();
67+
68+
public static native TemplateInstance disabledWebhookBody();
3669
}
3770
}

engine/src/main/java/com/redhat/cloud/notifications/templates/EmailTemplateMigrationService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Optional;
2525

2626
import static com.redhat.cloud.notifications.Constants.API_INTERNAL;
27+
import static com.redhat.cloud.notifications.events.DisabledWebhookNotifier.DISABLED_WEBHOOK_EVENT_TYPE;
28+
import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.INTEGRATION_FAILED_EVENT_TYPE;
2729
import static com.redhat.cloud.notifications.models.EmailSubscriptionType.DAILY;
2830
import static java.nio.charset.StandardCharsets.UTF_8;
2931
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
@@ -119,11 +121,17 @@ public List<String> migrate() {
119121
/*
120122
* Former src/main/resources/templates/ConsoleNotifications folder.
121123
*/
124+
getOrCreateTemplate(warnings, "ConsoleNotifications/insightsEmailBody", "html", "Notifications Insights email body");
122125
createInstantEmailTemplate(
123-
warnings, "console", "notifications", List.of("failed-integration"),
126+
warnings, "console", "notifications", List.of(INTEGRATION_FAILED_EVENT_TYPE),
124127
"ConsoleNotifications/failedIntegrationTitle", "txt", "Notifications failed integration email title",
125128
"ConsoleNotifications/failedIntegrationBody", "txt", "Notifications failed integration email body"
126129
);
130+
createInstantEmailTemplate(
131+
warnings, "console", "notifications", List.of(DISABLED_WEBHOOK_EVENT_TYPE),
132+
"ConsoleNotifications/disabledWebhookTitle", "txt", "Notifications disabled webhook email title",
133+
"ConsoleNotifications/disabledWebhookBody", "html", "Notifications disabled webhook email body"
134+
);
127135

128136
/*
129137
* Former src/main/resources/templates/CostManagement folder.

engine/src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ mp.messaging.outgoing.egress.topic=platform.notifications.ingress
2626
mp.messaging.outgoing.egress.group.id=integrations
2727
mp.messaging.outgoing.egress.key.serializer=org.apache.kafka.common.serialization.StringSerializer
2828
mp.messaging.outgoing.egress.value.serializer=org.apache.kafka.common.serialization.StringSerializer
29+
# Messages can be emitted on this topic from multiple emitters in our app
30+
mp.messaging.outgoing.egress.merge=true
2931

3032
# Output queue to Camel (notifications-sender)
3133
mp.messaging.outgoing.toCamel.connector=smallrye-kafka
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{#include ConsoleNotifications/insightsEmailBody}
2+
{#content-title}Integration '{action.payload.endpoint-description}' was disabled{/content-title}
3+
{#content-body}
4+
<tr>
5+
<td class="rh-content__block">
6+
<p>
7+
Integration <a target="_blank" href="{environment.url}/settings/integrations?name={action.payload.endpoint-description}">{action.payload.endpoint-description}</a> was disabled because
8+
{#if action.payload.error_type == "client"}
9+
the remote endpoint responded with an HTTP status code {action.payload.status-code}.
10+
Please review the configuration by clicking on the button below.
11+
{#else}
12+
the remote endpoint responded {action.payload.errors-count} times with a server error (HTTP status code 5xx).
13+
Please re-enable the integration by clicking on the button below if the remote endpoint is ready to receive webhook calls.
14+
{/if}
15+
</p>
16+
</td>
17+
</tr>
18+
<tr>
19+
<td class="rh-content__block">
20+
<table align="center">
21+
<tr>
22+
<td class="rh-cta-link" align="center">
23+
<a target="_blank" href="{environment.url}/settings/integrations?name={action.payload.endpoint-description}">
24+
<span>
25+
Open Integrations
26+
</span>
27+
</a>
28+
</td>
29+
</tr>
30+
</table>
31+
</td>
32+
</tr>
33+
{/content-body}
34+
{/include}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{action.context.system_check_in.toUtcFormat()} - Integration '{action.payload.endpoint-description}' was automatically disabled

0 commit comments

Comments
 (0)