Skip to content

Commit c673b66

Browse files
committed
Convert null message to empty string in DLQ to avoid NPE
Signed-off-by: Jonah Calvo <[email protected]>
1 parent d040181 commit c673b66

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ private DlqObject createDlqObjectFromEvent(final Event event,
679679
.withFailedData(FailedDlqData.builder()
680680
.withDocument(event.toJsonString())
681681
.withIndex(index)
682-
.withMessage(message)
682+
.withMessage(message != null ? message : "")
683683
.build())
684684
.withPluginName(PLUGIN_NAME)
685685
.withPipelineName(pipeline)

data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,4 +346,48 @@ void doOutput_with_invalid_version_expression_result_catches_RuntimeException_an
346346

347347
verify(dynamicDocumentVersionDroppedEvents).increment();
348348
}
349+
350+
@Test
351+
void createDlqObjectFromEvent_with_null_message_uses_default_message() throws IOException {
352+
when(pluginSetting.getName()).thenReturn("opensearch");
353+
354+
final Event event = mock(JacksonEvent.class);
355+
final String document = UUID.randomUUID().toString();
356+
when(event.toJsonString()).thenReturn(document);
357+
final EventHandle eventHandle = mock(EventHandle.class);
358+
when(event.getEventHandle()).thenReturn(eventHandle);
359+
final String index = UUID.randomUUID().toString();
360+
361+
final OpenSearchSink objectUnderTest = createObjectUnderTest();
362+
when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any()))
363+
.thenReturn(indexManager);
364+
doNothing().when(indexManager).setupIndex();
365+
objectUnderTest.initialize();
366+
367+
final DlqObject.Builder dlqObjectBuilder = mock(DlqObject.Builder.class);
368+
final ArgumentCaptor<FailedDlqData> failedDlqData = ArgumentCaptor.forClass(FailedDlqData.class);
369+
when(dlqObjectBuilder.withEventHandle(eventHandle)).thenReturn(dlqObjectBuilder);
370+
when(dlqObjectBuilder.withFailedData(failedDlqData.capture())).thenReturn(dlqObjectBuilder);
371+
when(dlqObjectBuilder.withPluginName(pluginSetting.getName())).thenReturn(dlqObjectBuilder);
372+
when(dlqObjectBuilder.withPluginId(pluginSetting.getName())).thenReturn(dlqObjectBuilder);
373+
when(dlqObjectBuilder.withPipelineName(pipelineDescription.getPipelineName())).thenReturn(dlqObjectBuilder);
374+
when(dlqObjectBuilder.build()).thenReturn(mock(DlqObject.class));
375+
376+
try (final MockedStatic<DlqObject> dlqObjectMockedStatic = mockStatic(DlqObject.class)) {
377+
dlqObjectMockedStatic.when(DlqObject::builder).thenReturn(dlqObjectBuilder);
378+
379+
// Use reflection to call the private method with null message
380+
java.lang.reflect.Method method = OpenSearchSink.class.getDeclaredMethod("createDlqObjectFromEvent", Event.class, String.class, String.class);
381+
method.setAccessible(true);
382+
method.invoke(objectUnderTest, event, index, null);
383+
} catch (Exception e) {
384+
throw new RuntimeException(e);
385+
}
386+
387+
final FailedDlqData failedDlqDataResult = failedDlqData.getValue();
388+
assertThat(failedDlqDataResult, notNullValue());
389+
assertThat(failedDlqDataResult.getDocument(), equalTo(document));
390+
assertThat(failedDlqDataResult.getIndex(), equalTo(index));
391+
assertThat(failedDlqDataResult.getMessage(), equalTo(""));
392+
}
349393
}

0 commit comments

Comments
 (0)