Skip to content

Commit f68b9e6

Browse files
committed
NOTIF-512 Notify org admins when a webhook is disabled
1 parent 2d9fa3c commit f68b9e6

File tree

15 files changed

+855
-38
lines changed

15 files changed

+855
-38
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
INSERT INTO event_type (id, application_id, name, display_name)
2+
SELECT gen_random_uuid(), a.id , 'integration-disabled', 'Integration disabled'
3+
FROM applications a, bundles b
4+
WHERE a.bundle_id = b.id AND a.name = 'notifications' AND b.name = 'console'
5+
ON CONFLICT DO NOTHING;

engine/src/main/java/com/redhat/cloud/notifications/events/FromCamelHistoryFiller.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414
import io.micrometer.core.instrument.MeterRegistry;
1515
import io.quarkus.logging.Log;
1616
import io.smallrye.reactive.messaging.annotations.Blocking;
17-
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
1817
import io.vertx.core.json.Json;
19-
import org.apache.kafka.common.header.internals.RecordHeaders;
2018
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
2119
import org.eclipse.microprofile.reactive.messaging.Channel;
2220
import org.eclipse.microprofile.reactive.messaging.Emitter;
@@ -32,9 +30,6 @@
3230
import java.util.Map;
3331
import java.util.UUID;
3432

35-
import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER;
36-
import static java.nio.charset.StandardCharsets.UTF_8;
37-
3833
/**
3934
* We sent data via Camel. Now Camel informs us about the outcome,
4035
* which we need to put into the notifications history.
@@ -46,6 +41,7 @@ public class FromCamelHistoryFiller {
4641
public static final String MESSAGES_ERROR_COUNTER_NAME = "camel.messages.error";
4742
public static final String MESSAGES_PROCESSED_COUNTER_NAME = "camel.messages.processed";
4843
public static final String EGRESS_CHANNEL = "egress";
44+
public static final String INTEGRATION_FAILED_EVENT_TYPE = "integration-failed";
4945

5046
@Inject
5147
NotificationHistoryRepository notificationHistoryRepository;
@@ -122,7 +118,7 @@ private void reinjectIfNeeded(Map<String, Object> payloadMap) {
122118
.withId(UUID.randomUUID())
123119
.withBundle("console")
124120
.withApplication("notifications")
125-
.withEventType("integration-failed")
121+
.withEventType(INTEGRATION_FAILED_EVENT_TYPE)
126122
.withAccountId(ep != null ? ep.getAccountId() : "")
127123
.withOrgId(ep != null && ep.getOrgId() != null ? ep.getOrgId() : "")
128124
.withContext(contextBuilder.build())
@@ -137,22 +133,10 @@ private void reinjectIfNeeded(Map<String, Object> payloadMap) {
137133

138134
String ser = Parser.encode(action);
139135
// Add the message id in Kafka header for the de-duplicator
140-
Message<String> message = buildMessageWithId(ser);
136+
Message<String> message = KafkaMessageWithIdBuilder.build(ser);
141137
emitter.send(message);
142138
}
143139

144-
145-
// Blindly copied from -gw. Perhaps put this into Schema project
146-
private static Message buildMessageWithId(String payload) {
147-
byte[] messageId = UUID.randomUUID().toString().getBytes(UTF_8);
148-
OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.builder()
149-
.withHeaders(new RecordHeaders().add(MESSAGE_ID_HEADER, messageId))
150-
.build();
151-
return Message.of(payload).addMetadata(metadata);
152-
}
153-
154-
155-
156140
private Map<String, Object> decodeItem(String s) {
157141

158142
// 1st step CloudEvent as String -> map
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.redhat.cloud.notifications.events;
2+
3+
import com.redhat.cloud.notifications.ingress.Action;
4+
import com.redhat.cloud.notifications.ingress.Event;
5+
import com.redhat.cloud.notifications.ingress.Parser;
6+
import com.redhat.cloud.notifications.ingress.Payload;
7+
import com.redhat.cloud.notifications.ingress.Recipient;
8+
import com.redhat.cloud.notifications.models.Endpoint;
9+
import org.eclipse.microprofile.reactive.messaging.Channel;
10+
import org.eclipse.microprofile.reactive.messaging.Emitter;
11+
import org.eclipse.microprofile.reactive.messaging.Message;
12+
13+
import javax.enterprise.context.ApplicationScoped;
14+
import java.time.LocalDateTime;
15+
import java.util.List;
16+
import java.util.UUID;
17+
18+
import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.EGRESS_CHANNEL;
19+
import static java.time.ZoneOffset.UTC;
20+
21+
@ApplicationScoped
22+
public class IntegrationDisabledNotifier {
23+
24+
public static final String CLIENT_ERROR_TYPE = "client";
25+
public static final String SERVER_ERROR_TYPE = "server";
26+
public static final String CONSOLE_BUNDLE = "console";
27+
public static final String NOTIFICATIONS_APP = "notifications";
28+
public static final String INTEGRATION_DISABLED_EVENT_TYPE = "integration-disabled";
29+
public static final String ERROR_TYPE_PROPERTY = "error_type";
30+
public static final String ENDPOINT_ID_PROPERTY = "endpoint_id";
31+
public static final String ENDPOINT_NAME_PROPERTY = "endpoint_name";
32+
public static final String STATUS_CODE_PROPERTY = "status_code";
33+
public static final String ERRORS_COUNT_PROPERTY = "errors_count";
34+
35+
@Channel(EGRESS_CHANNEL)
36+
Emitter<String> emitter;
37+
38+
public void clientError(Endpoint endpoint, int statusCode) {
39+
notify(endpoint, CLIENT_ERROR_TYPE, statusCode, 1);
40+
}
41+
42+
public void tooManyServerErrors(Endpoint endpoint, int errorsCount) {
43+
notify(endpoint, SERVER_ERROR_TYPE, -1, errorsCount);
44+
}
45+
46+
private void notify(Endpoint endpoint, String errorType, int statusCode, int errorsCount) {
47+
Payload.PayloadBuilderBase payloadBuilder = new Payload.PayloadBuilder()
48+
.withAdditionalProperty(ERROR_TYPE_PROPERTY, errorType)
49+
.withAdditionalProperty(ENDPOINT_ID_PROPERTY, endpoint.getId())
50+
.withAdditionalProperty(ENDPOINT_NAME_PROPERTY, endpoint.getName())
51+
.withAdditionalProperty(ERRORS_COUNT_PROPERTY, errorsCount);
52+
53+
if (statusCode > 0) {
54+
payloadBuilder.withAdditionalProperty(STATUS_CODE_PROPERTY, statusCode);
55+
}
56+
57+
Event event = new Event.EventBuilder()
58+
.withPayload(payloadBuilder.build())
59+
.build();
60+
61+
Recipient recipients = new Recipient.RecipientBuilder()
62+
.withOnlyAdmins(true)
63+
.withIgnoreUserPreferences(true)
64+
.build();
65+
66+
Action action = new Action.ActionBuilder()
67+
.withId(UUID.randomUUID())
68+
.withBundle(CONSOLE_BUNDLE)
69+
.withApplication(NOTIFICATIONS_APP)
70+
.withEventType(INTEGRATION_DISABLED_EVENT_TYPE)
71+
.withOrgId(endpoint.getOrgId())
72+
.withTimestamp(LocalDateTime.now(UTC))
73+
.withEvents(List.of(event))
74+
.withRecipients(List.of(recipients))
75+
.build();
76+
String encodedAction = Parser.encode(action);
77+
78+
Message<String> message = KafkaMessageWithIdBuilder.build(encodedAction);
79+
emitter.send(message);
80+
}
81+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.redhat.cloud.notifications.events;
2+
3+
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
4+
import org.apache.kafka.common.header.internals.RecordHeaders;
5+
import org.eclipse.microprofile.reactive.messaging.Message;
6+
7+
import java.util.UUID;
8+
9+
import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER;
10+
import static java.nio.charset.StandardCharsets.UTF_8;
11+
12+
public class KafkaMessageWithIdBuilder {
13+
14+
public static Message build(String payload) {
15+
byte[] messageId = UUID.randomUUID().toString().getBytes(UTF_8);
16+
OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.builder()
17+
.withHeaders(new RecordHeaders().add(MESSAGE_ID_HEADER, messageId))
18+
.build();
19+
return Message.of(payload).addMetadata(metadata);
20+
}
21+
}

engine/src/main/java/com/redhat/cloud/notifications/processors/webhooks/WebhookTypeProcessor.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.redhat.cloud.notifications.config.FeatureFlipper;
44
import com.redhat.cloud.notifications.db.repositories.EndpointRepository;
5+
import com.redhat.cloud.notifications.events.IntegrationDisabledNotifier;
56
import com.redhat.cloud.notifications.models.Endpoint;
67
import com.redhat.cloud.notifications.models.Event;
78
import com.redhat.cloud.notifications.models.Notification;
@@ -42,7 +43,7 @@
4243
@ApplicationScoped
4344
public class WebhookTypeProcessor implements EndpointTypeProcessor {
4445

45-
public static final String DISABLED_ENDPOINTS_COUNTER = "processor.webhook.disabled.endpoints";
46+
public static final String DISABLED_WEBHOOKS_COUNTER = "processor.webhook.disabled.endpoints";
4647
public static final String ERROR_TYPE_TAG_KEY = "error_type";
4748
public static final String CLIENT_TAG_VALUE = "client";
4849
public static final String SERVER_TAG_VALUE = "server";
@@ -82,19 +83,22 @@ public class WebhookTypeProcessor implements EndpointTypeProcessor {
8283
@Inject
8384
EndpointRepository endpointRepository;
8485

86+
@Inject
87+
IntegrationDisabledNotifier integrationDisabledNotifier;
88+
8589
@Inject
8690
MeterRegistry registry;
8791

8892
private Counter processedCount;
89-
private Counter disabledEndpointsClientErrorCount;
90-
private Counter disabledEndpointsServerErrorCount;
93+
private Counter disabledWebhooksClientErrorCount;
94+
private Counter disabledWebhooksServerErrorCount;
9195
private RetryPolicy<Object> retryPolicy;
9296

9397
@PostConstruct
9498
void postConstruct() {
9599
processedCount = registry.counter("processor.webhook.processed");
96-
disabledEndpointsClientErrorCount = registry.counter(DISABLED_ENDPOINTS_COUNTER, ERROR_TYPE_TAG_KEY, CLIENT_TAG_VALUE);
97-
disabledEndpointsServerErrorCount = registry.counter(DISABLED_ENDPOINTS_COUNTER, ERROR_TYPE_TAG_KEY, SERVER_TAG_VALUE);
100+
disabledWebhooksClientErrorCount = registry.counter(DISABLED_WEBHOOKS_COUNTER, ERROR_TYPE_TAG_KEY, CLIENT_TAG_VALUE);
101+
disabledWebhooksServerErrorCount = registry.counter(DISABLED_WEBHOOKS_COUNTER, ERROR_TYPE_TAG_KEY, SERVER_TAG_VALUE);
98102
retryPolicy = RetryPolicy.builder()
99103
.handleIf(this::shouldRetry)
100104
.withBackoff(initialRetryBackOff, maxRetryBackOff)
@@ -176,9 +180,9 @@ public NotificationHistory doHttpRequest(Notification item, HttpRequest<Buffer>
176180
*/
177181
boolean disabled = endpointRepository.incrementEndpointServerErrors(item.getEndpoint().getId(), maxServerErrors);
178182
if (disabled) {
179-
disabledEndpointsServerErrorCount.increment();
183+
disabledWebhooksServerErrorCount.increment();
180184
Log.infof("Endpoint %s was disabled because we received too many 5xx status while calling it", item.getEndpoint().getId());
181-
// TODO NOTIF-512 Send a notification to the org admin explaining the situation.
185+
integrationDisabledNotifier.tooManyServerErrors(item.getEndpoint(), maxServerErrors);
182186
}
183187
}
184188
}
@@ -196,9 +200,9 @@ public NotificationHistory doHttpRequest(Notification item, HttpRequest<Buffer>
196200
*/
197201
boolean disabled = endpointRepository.disableEndpoint(item.getEndpoint().getId());
198202
if (disabled) {
199-
disabledEndpointsClientErrorCount.increment();
203+
disabledWebhooksClientErrorCount.increment();
200204
Log.infof("Endpoint %s was disabled because we received a 4xx status while calling it", item.getEndpoint().getId());
201-
// TODO NOTIF-512 Send a notification to the org admin explaining the situation.
205+
integrationDisabledNotifier.clientError(item.getEndpoint(), resp.statusCode());
202206
}
203207
} else {
204208
/*

engine/src/main/java/com/redhat/cloud/notifications/templates/ConsoleNotifications.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,51 @@
44
import io.quarkus.qute.CheckedTemplate;
55
import io.quarkus.qute.TemplateInstance;
66

7+
import static com.redhat.cloud.notifications.events.IntegrationDisabledNotifier.INTEGRATION_DISABLED_EVENT_TYPE;
8+
import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.INTEGRATION_FAILED_EVENT_TYPE;
9+
import static com.redhat.cloud.notifications.models.EmailSubscriptionType.INSTANT;
10+
711
// Class name is the folder name in resources/templates/
812
public class ConsoleNotifications implements EmailTemplate {
13+
14+
private static final String NO_TITLE_FOUND_MSG = "No email title template found for ConsoleNotifications event_type: %s";
15+
private static final String NO_BODY_FOUND_MSG = "No email body template found for ConsoleNotifications event_type: %s";
16+
917
@Override
1018
public TemplateInstance getTitle(String eventType, EmailSubscriptionType type) {
11-
return Templates.failedIntegrationTitle();
19+
if (type == INSTANT) {
20+
switch (eventType) {
21+
case INTEGRATION_FAILED_EVENT_TYPE:
22+
return Templates.failedIntegrationTitle();
23+
case INTEGRATION_DISABLED_EVENT_TYPE:
24+
return Templates.integrationDisabledTitle();
25+
default:
26+
// Do nothing.
27+
break;
28+
}
29+
}
30+
throw new UnsupportedOperationException(String.format(NO_TITLE_FOUND_MSG, eventType));
1231
}
1332

1433
@Override
1534
public TemplateInstance getBody(String eventType, EmailSubscriptionType type) {
16-
return Templates.failedIntegrationBody();
35+
if (type == INSTANT) {
36+
switch (eventType) {
37+
case INTEGRATION_FAILED_EVENT_TYPE:
38+
return Templates.failedIntegrationBody();
39+
case INTEGRATION_DISABLED_EVENT_TYPE:
40+
return Templates.integrationDisabledBody();
41+
default:
42+
// Do nothing.
43+
break;
44+
}
45+
}
46+
throw new UnsupportedOperationException(String.format(NO_BODY_FOUND_MSG, eventType));
1747
}
1848

1949
@Override
2050
public boolean isSupported(String eventType, EmailSubscriptionType type) {
21-
return type.equals(EmailSubscriptionType.INSTANT);
51+
return type.equals(INSTANT);
2252
}
2353

2454
@Override
@@ -33,5 +63,8 @@ public static class Templates {
3363

3464
public static native TemplateInstance failedIntegrationBody();
3565

66+
public static native TemplateInstance integrationDisabledTitle();
67+
68+
public static native TemplateInstance integrationDisabledBody();
3669
}
3770
}

engine/src/main/java/com/redhat/cloud/notifications/templates/EmailTemplateMigrationService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.Optional;
2525

2626
import static com.redhat.cloud.notifications.Constants.API_INTERNAL;
27+
import static com.redhat.cloud.notifications.events.IntegrationDisabledNotifier.INTEGRATION_DISABLED_EVENT_TYPE;
28+
import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.INTEGRATION_FAILED_EVENT_TYPE;
2729
import static com.redhat.cloud.notifications.models.EmailSubscriptionType.DAILY;
2830
import static java.nio.charset.StandardCharsets.UTF_8;
2931
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
@@ -119,11 +121,17 @@ public List<String> migrate() {
119121
/*
120122
* Former src/main/resources/templates/ConsoleNotifications folder.
121123
*/
124+
getOrCreateTemplate(warnings, "ConsoleNotifications/insightsEmailBody", "html", "Notifications Insights email body");
122125
createInstantEmailTemplate(
123-
warnings, "console", "notifications", List.of("failed-integration"),
126+
warnings, "console", "notifications", List.of(INTEGRATION_FAILED_EVENT_TYPE),
124127
"ConsoleNotifications/failedIntegrationTitle", "txt", "Notifications failed integration email title",
125128
"ConsoleNotifications/failedIntegrationBody", "txt", "Notifications failed integration email body"
126129
);
130+
createInstantEmailTemplate(
131+
warnings, "console", "notifications", List.of(INTEGRATION_DISABLED_EVENT_TYPE),
132+
"ConsoleNotifications/integrationDisabledTitle", "txt", "Notifications disabled integration email title",
133+
"ConsoleNotifications/integrationDisabledBody", "html", "Notifications disabled integration email body"
134+
);
127135

128136
/*
129137
* Former src/main/resources/templates/CostManagement folder.

engine/src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ mp.messaging.outgoing.egress.topic=platform.notifications.ingress
2626
mp.messaging.outgoing.egress.group.id=integrations
2727
mp.messaging.outgoing.egress.key.serializer=org.apache.kafka.common.serialization.StringSerializer
2828
mp.messaging.outgoing.egress.value.serializer=org.apache.kafka.common.serialization.StringSerializer
29+
# Messages can be emitted on this topic from multiple emitters in our app
30+
mp.messaging.outgoing.egress.merge=true
2931

3032
# Output queue to Camel (notifications-sender)
3133
mp.messaging.outgoing.toCamel.connector=smallrye-kafka

0 commit comments

Comments
 (0)