Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
INSERT INTO event_type (id, application_id, name, display_name)
SELECT gen_random_uuid(), a.id , 'integration-disabled', 'Integration disabled'
FROM applications a, bundles b
WHERE a.bundle_id = b.id AND a.name = 'notifications' AND b.name = 'console'
ON CONFLICT DO NOTHING;
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import io.micrometer.core.instrument.MeterRegistry;
import io.quarkus.logging.Log;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import io.vertx.core.json.Json;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
Expand All @@ -32,9 +30,6 @@
import java.util.Map;
import java.util.UUID;

import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* We sent data via Camel. Now Camel informs us about the outcome,
* which we need to put into the notifications history.
Expand All @@ -46,6 +41,7 @@ public class FromCamelHistoryFiller {
public static final String MESSAGES_ERROR_COUNTER_NAME = "camel.messages.error";
public static final String MESSAGES_PROCESSED_COUNTER_NAME = "camel.messages.processed";
public static final String EGRESS_CHANNEL = "egress";
public static final String INTEGRATION_FAILED_EVENT_TYPE = "integration-failed";

@Inject
NotificationHistoryRepository notificationHistoryRepository;
Expand Down Expand Up @@ -122,7 +118,7 @@ private void reinjectIfNeeded(Map<String, Object> payloadMap) {
.withId(UUID.randomUUID())
.withBundle("console")
.withApplication("notifications")
.withEventType("integration-failed")
.withEventType(INTEGRATION_FAILED_EVENT_TYPE)
.withAccountId(ep != null ? ep.getAccountId() : "")
.withOrgId(ep != null && ep.getOrgId() != null ? ep.getOrgId() : "")
.withContext(contextBuilder.build())
Expand All @@ -137,22 +133,10 @@ private void reinjectIfNeeded(Map<String, Object> payloadMap) {

String ser = Parser.encode(action);
// Add the message id in Kafka header for the de-duplicator
Message<String> message = buildMessageWithId(ser);
Message<String> message = KafkaMessageWithIdBuilder.build(ser);
emitter.send(message);
}


// Blindly copied from -gw. Perhaps put this into Schema project
private static Message buildMessageWithId(String payload) {
byte[] messageId = UUID.randomUUID().toString().getBytes(UTF_8);
OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.builder()
.withHeaders(new RecordHeaders().add(MESSAGE_ID_HEADER, messageId))
.build();
return Message.of(payload).addMetadata(metadata);
}



private Map<String, Object> decodeItem(String s) {

// 1st step CloudEvent as String -> map
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.redhat.cloud.notifications.events;

import com.redhat.cloud.notifications.ingress.Action;
import com.redhat.cloud.notifications.ingress.Event;
import com.redhat.cloud.notifications.ingress.Parser;
import com.redhat.cloud.notifications.ingress.Payload;
import com.redhat.cloud.notifications.ingress.Recipient;
import com.redhat.cloud.notifications.models.Endpoint;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Message;

import javax.enterprise.context.ApplicationScoped;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;

import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.EGRESS_CHANNEL;
import static java.time.ZoneOffset.UTC;

@ApplicationScoped
public class IntegrationDisabledNotifier {

public static final String CLIENT_ERROR_TYPE = "client";
public static final String SERVER_ERROR_TYPE = "server";
public static final String CONSOLE_BUNDLE = "console";
public static final String NOTIFICATIONS_APP = "notifications";
public static final String INTEGRATION_DISABLED_EVENT_TYPE = "integration-disabled";
public static final String ERROR_TYPE_PROPERTY = "error_type";
public static final String ENDPOINT_ID_PROPERTY = "endpoint_id";
public static final String ENDPOINT_NAME_PROPERTY = "endpoint_name";
public static final String STATUS_CODE_PROPERTY = "status_code";
public static final String ERRORS_COUNT_PROPERTY = "errors_count";

@Channel(EGRESS_CHANNEL)
Emitter<String> emitter;

public void clientError(Endpoint endpoint, int statusCode) {
notify(endpoint, CLIENT_ERROR_TYPE, statusCode, 1);
}

public void tooManyServerErrors(Endpoint endpoint, int errorsCount) {
notify(endpoint, SERVER_ERROR_TYPE, -1, errorsCount);
}

private void notify(Endpoint endpoint, String errorType, int statusCode, int errorsCount) {
Payload.PayloadBuilderBase payloadBuilder = new Payload.PayloadBuilder()
.withAdditionalProperty(ERROR_TYPE_PROPERTY, errorType)
.withAdditionalProperty(ENDPOINT_ID_PROPERTY, endpoint.getId())
.withAdditionalProperty(ENDPOINT_NAME_PROPERTY, endpoint.getName())
.withAdditionalProperty(ERRORS_COUNT_PROPERTY, errorsCount);

if (statusCode > 0) {
payloadBuilder.withAdditionalProperty(STATUS_CODE_PROPERTY, statusCode);
}

Event event = new Event.EventBuilder()
.withPayload(payloadBuilder.build())
.build();

Recipient recipients = new Recipient.RecipientBuilder()
.withOnlyAdmins(true)
.withIgnoreUserPreferences(true)
.build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be removed in the future, once we have something to have defaults (e.g. all admins subscribed for these notifications and a default behavior group on the account that can be removed) so they can fine-tune this setting (maybe a particular RBAC group would be the one handling these errors and no the admin itself.


Action action = new Action.ActionBuilder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a side note, the suggested builder syntax has been merged, but not deployed yet!

.withId(UUID.randomUUID())
.withBundle(CONSOLE_BUNDLE)
.withApplication(NOTIFICATIONS_APP)
.withEventType(INTEGRATION_DISABLED_EVENT_TYPE)
.withOrgId(endpoint.getOrgId())
.withTimestamp(LocalDateTime.now(UTC))
.withEvents(List.of(event))
.withRecipients(List.of(recipients))
.build();
String encodedAction = Parser.encode(action);

Message<String> message = KafkaMessageWithIdBuilder.build(encodedAction);
emitter.send(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.redhat.cloud.notifications.events;

import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.util.UUID;

import static com.redhat.cloud.notifications.events.KafkaMessageDeduplicator.MESSAGE_ID_HEADER;
import static java.nio.charset.StandardCharsets.UTF_8;

public class KafkaMessageWithIdBuilder {

public static Message build(String payload) {
byte[] messageId = UUID.randomUUID().toString().getBytes(UTF_8);
OutgoingKafkaRecordMetadata metadata = OutgoingKafkaRecordMetadata.builder()
.withHeaders(new RecordHeaders().add(MESSAGE_ID_HEADER, messageId))
.build();
return Message.of(payload).addMetadata(metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.redhat.cloud.notifications.config.FeatureFlipper;
import com.redhat.cloud.notifications.db.repositories.EndpointRepository;
import com.redhat.cloud.notifications.events.IntegrationDisabledNotifier;
import com.redhat.cloud.notifications.models.Endpoint;
import com.redhat.cloud.notifications.models.Event;
import com.redhat.cloud.notifications.models.Notification;
Expand Down Expand Up @@ -42,7 +43,7 @@
@ApplicationScoped
public class WebhookTypeProcessor implements EndpointTypeProcessor {

public static final String DISABLED_ENDPOINTS_COUNTER = "processor.webhook.disabled.endpoints";
public static final String DISABLED_WEBHOOKS_COUNTER = "processor.webhook.disabled.endpoints";
public static final String ERROR_TYPE_TAG_KEY = "error_type";
public static final String CLIENT_TAG_VALUE = "client";
public static final String SERVER_TAG_VALUE = "server";
Expand Down Expand Up @@ -82,19 +83,22 @@ public class WebhookTypeProcessor implements EndpointTypeProcessor {
@Inject
EndpointRepository endpointRepository;

@Inject
IntegrationDisabledNotifier integrationDisabledNotifier;

@Inject
MeterRegistry registry;

private Counter processedCount;
private Counter disabledEndpointsClientErrorCount;
private Counter disabledEndpointsServerErrorCount;
private Counter disabledWebhooksClientErrorCount;
private Counter disabledWebhooksServerErrorCount;
private RetryPolicy<Object> retryPolicy;

@PostConstruct
void postConstruct() {
processedCount = registry.counter("processor.webhook.processed");
disabledEndpointsClientErrorCount = registry.counter(DISABLED_ENDPOINTS_COUNTER, ERROR_TYPE_TAG_KEY, CLIENT_TAG_VALUE);
disabledEndpointsServerErrorCount = registry.counter(DISABLED_ENDPOINTS_COUNTER, ERROR_TYPE_TAG_KEY, SERVER_TAG_VALUE);
disabledWebhooksClientErrorCount = registry.counter(DISABLED_WEBHOOKS_COUNTER, ERROR_TYPE_TAG_KEY, CLIENT_TAG_VALUE);
disabledWebhooksServerErrorCount = registry.counter(DISABLED_WEBHOOKS_COUNTER, ERROR_TYPE_TAG_KEY, SERVER_TAG_VALUE);
retryPolicy = RetryPolicy.builder()
.handleIf(this::shouldRetry)
.withBackoff(initialRetryBackOff, maxRetryBackOff)
Expand Down Expand Up @@ -176,9 +180,9 @@ public NotificationHistory doHttpRequest(Notification item, HttpRequest<Buffer>
*/
boolean disabled = endpointRepository.incrementEndpointServerErrors(item.getEndpoint().getId(), maxServerErrors);
if (disabled) {
disabledEndpointsServerErrorCount.increment();
disabledWebhooksServerErrorCount.increment();
Log.infof("Endpoint %s was disabled because we received too many 5xx status while calling it", item.getEndpoint().getId());
// TODO NOTIF-512 Send a notification to the org admin explaining the situation.
integrationDisabledNotifier.tooManyServerErrors(item.getEndpoint(), maxServerErrors);
}
}
}
Expand All @@ -196,9 +200,9 @@ public NotificationHistory doHttpRequest(Notification item, HttpRequest<Buffer>
*/
boolean disabled = endpointRepository.disableEndpoint(item.getEndpoint().getId());
if (disabled) {
disabledEndpointsClientErrorCount.increment();
disabledWebhooksClientErrorCount.increment();
Log.infof("Endpoint %s was disabled because we received a 4xx status while calling it", item.getEndpoint().getId());
// TODO NOTIF-512 Send a notification to the org admin explaining the situation.
integrationDisabledNotifier.clientError(item.getEndpoint(), resp.statusCode());
}
} else {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,51 @@
import io.quarkus.qute.CheckedTemplate;
import io.quarkus.qute.TemplateInstance;

import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.INTEGRATION_FAILED_EVENT_TYPE;
import static com.redhat.cloud.notifications.events.IntegrationDisabledNotifier.INTEGRATION_DISABLED_EVENT_TYPE;
import static com.redhat.cloud.notifications.models.EmailSubscriptionType.INSTANT;

// Name needs to be "ConsoleNotifications" to read templates from resources/templates/ConsoleNotifications
public class ConsoleNotifications implements EmailTemplate {

private static final String NO_TITLE_FOUND_MSG = "No email title template found for ConsoleNotifications event_type: %s";
private static final String NO_BODY_FOUND_MSG = "No email body template found for ConsoleNotifications event_type: %s";

@Override
public TemplateInstance getTitle(String eventType, EmailSubscriptionType type) {
return Templates.failedIntegrationTitle();
if (type == INSTANT) {
switch (eventType) {
case INTEGRATION_FAILED_EVENT_TYPE:
return Templates.failedIntegrationTitle();
case INTEGRATION_DISABLED_EVENT_TYPE:
return Templates.integrationDisabledTitle();
default:
// Do nothing.
break;
}
}
throw new UnsupportedOperationException(String.format(NO_TITLE_FOUND_MSG, eventType));
}

@Override
public TemplateInstance getBody(String eventType, EmailSubscriptionType type) {
return Templates.failedIntegrationBody();
if (type == INSTANT) {
switch (eventType) {
case INTEGRATION_FAILED_EVENT_TYPE:
return Templates.failedIntegrationBody();
case INTEGRATION_DISABLED_EVENT_TYPE:
return Templates.integrationDisabledBody();
default:
// Do nothing.
break;
}
}
throw new UnsupportedOperationException(String.format(NO_BODY_FOUND_MSG, eventType));
}

@Override
public boolean isSupported(String eventType, EmailSubscriptionType type) {
return type.equals(EmailSubscriptionType.INSTANT);
return type.equals(INSTANT);
}

@Override
Expand All @@ -33,5 +63,8 @@ public static class Templates {

public static native TemplateInstance failedIntegrationBody();

public static native TemplateInstance integrationDisabledTitle();

public static native TemplateInstance integrationDisabledBody();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Optional;

import static com.redhat.cloud.notifications.Constants.API_INTERNAL;
import static com.redhat.cloud.notifications.events.FromCamelHistoryFiller.INTEGRATION_FAILED_EVENT_TYPE;
import static com.redhat.cloud.notifications.events.IntegrationDisabledNotifier.INTEGRATION_DISABLED_EVENT_TYPE;
import static com.redhat.cloud.notifications.models.EmailSubscriptionType.DAILY;
import static java.nio.charset.StandardCharsets.UTF_8;
import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
Expand Down Expand Up @@ -119,11 +121,17 @@ public List<String> migrate() {
/*
* Former src/main/resources/templates/ConsoleNotifications folder.
*/
getOrCreateTemplate(warnings, "ConsoleNotifications/insightsEmailBody", "html", "Notifications Insights email body");
createInstantEmailTemplate(
warnings, "console", "notifications", List.of("failed-integration"),
warnings, "console", "notifications", List.of(INTEGRATION_FAILED_EVENT_TYPE),
"ConsoleNotifications/failedIntegrationTitle", "txt", "Notifications failed integration email title",
"ConsoleNotifications/failedIntegrationBody", "txt", "Notifications failed integration email body"
);
createInstantEmailTemplate(
warnings, "console", "notifications", List.of(INTEGRATION_DISABLED_EVENT_TYPE),
"ConsoleNotifications/integrationDisabledTitle", "txt", "Notifications disabled integration email title",
"ConsoleNotifications/integrationDisabledBody", "html", "Notifications disabled integration email body"
);

/*
* Former src/main/resources/templates/CostManagement folder.
Expand Down
2 changes: 2 additions & 0 deletions engine/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mp.messaging.outgoing.egress.topic=platform.notifications.ingress
mp.messaging.outgoing.egress.group.id=integrations
mp.messaging.outgoing.egress.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.egress.value.serializer=org.apache.kafka.common.serialization.StringSerializer
# Messages can be emitted on this topic from multiple emitters in our app
mp.messaging.outgoing.egress.merge=true

# Output queue to Camel (notifications-sender)
mp.messaging.outgoing.toCamel.connector=smallrye-kafka
Expand Down
Loading