Skip to content

[BUG] S3 source does not stop on pipeline shutdown #3341

@asifsmohammed

Description

@asifsmohammed

Describe the bug
S3 source is not stopped after the pipeline shutdown call. This will cause loss of SQS messages if end-to-end acknowledgements are not enabled.

To Reproduce
Steps to reproduce the behavior:

  1. Create an S3 pipeline
  2. Send SQS notifications to the pipeline
  3. Make the pipeline fail by not having permissions to write to sink
  4. S3 source still polls SQS notifications after pipeline shutdown is called (check logs) even after processors and sink are shutdown.

Expected behavior
S3 source should stop before the processors or sink shutdown.

Environment (please complete the following information):

  • OS: [e.g. Ubuntu 20.04 LTS]
  • Version [e.g. 22]

Additional context
The InterruptedException clears the interrupt state of thread, so we should handle it gracefully so the source won't be active after shutdown.

2023-08-04T18:56:40.761 [s3-log-pipeline-sink-worker-2-thread-1] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [s3-log-pipeline] - Shutting down processor process workers.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-2] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Processor shutdown phase 1 complete.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-2] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Beginning processor shutdown phase 2, iterating until buffers empty.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-2] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Processor shutdown phase 2 complete.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-2] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Beginning processor shutdown phase 3, iterating until 1691175580761.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-1] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Processor shutdown phase 1 complete.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-1] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Beginning processor shutdown phase 2, iterating until buffers empty.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-1] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Processor shutdown phase 2 complete.
2023-08-04T18:56:40.761 [s3-log-pipeline-processor-worker-1-thread-1] INFO  org.opensearch.dataprepper.pipeline.ProcessWorker - Beginning processor shutdown phase 3, iterating until 1691175580761.
2023-08-04T18:57:00.636 [Thread-10] ERROR org.opensearch.dataprepper.plugins.source.SqsWorker - Unable to process SQS messages. Processing error due to: Thread was interrupted
2023-08-04T18:57:00.637 [Thread-10] INFO  org.opensearch.dataprepper.plugins.source.SqsWorker - Pausing SQS processing for 16.212 seconds due to an error in processing.
2023-08-04T18:57:00.637 [Thread-10] ERROR org.opensearch.dataprepper.plugins.source.SqsWorker - Thread is interrupted while polling SQS with retry.
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method) ~[?:?]
    at org.opensearch.dataprepper.plugins.source.SqsWorker.applyBackoff(SqsWorker.java:165) ~[s3-source-2.3.2.jar:?]

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions