Skip to content

Conversation

gkamat
Copy link
Collaborator

@gkamat gkamat commented Aug 16, 2025

Description

Initial set of Data Reader changes for Streaming Ingestion. This is an interim check-in to facilitate the availability of the feature in OSB 2.0 as an alpha capability. There are several enhancements to be added yet; these will be made available progressively after the release. Integration tests are also to be added.

For now, the focus has been to ensure that OSB functionality, when Streaming Ingestion is not enabled, is not affected.

Usage

To operate in Streaming Ingestion mode, the streaming-ingestion tag must be attached to the corpus in the workload specification. Currently, only aws is supported, and a single corpus must be accessible on S3, with the appropriate credentials enabled. For example, with the http_logs workload:

index 09771c4..f608c41 100644
--- a/http_logs/workload.json
+++ b/http_logs/workload.json
@@ -8,7 +8,12 @@
   "description": "HTTP server log data",
   "#TODO": "Replace index definitions with a template after setting the workload version to 2. Explicit index definitions are not necessary anymore.",
   "indices": [
-    {%- if generated_corpus is defined %}
+    {%- if true %}
+      {
+       "name": "logs-streamed",
+       "body": "{{ index_body }}"
+      }
+    {%- elif generated_corpus is defined %}
       {{ benchmark.collect(parts="gen-idx-*.json") }}
     {%- else %}
       {
@@ -46,7 +51,18 @@
     {%- endif %}
   ],
   "corpora": [
-      {%- if generated_corpus is defined %}
+      {%- if true %}
+       {
+         "name": "http_logs",
+         "base-url": "s3://regenerated-corpora",
+         "streaming-ingestion": "aws",
+         "documents": [
+           {
+             "source-file": "documents-hlogs.json"
+           }
+         ]
+       }
+      {%- elif generated_corpus is defined %}
         {
           "name": "http_logs",
           "documents": [

Issues Addressed

#918

Testing

  • Unit and integ tests

[Describe how this change was tested]


By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Collaborator

@IanHoang IanHoang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @gkamat Could we sync to see how this works E2E?

# For testing. Set credentials with environment variables.
self.s3_client = client('s3')
self.chunk_size = IngestionManager.chunk_size * 1024**2
self.num_workers = os.cpu_count() * 2
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we always want 2x the number of CPU cores or will we ever want to use less?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question, given that the cores are used for the bulk operations as well. This is one of the items to be evaluated during the scaling experiments #929, but for now the feature seems to perform generally well with this number.

@@ -412,6 +415,8 @@ def __init__(self, cfg):

@staticmethod
def prepare_docs(cfg, workload, corpus, preparator):
if corpus.streaming_ingestion:
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: A logging statement here would be useful. Can mention that workload loader detected that user is using streaming ingestion and from which cloud provider (only aws for now)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change needs to have logging added all through (and error-checking), but will include this for clarity.

@@ -776,8 +777,7 @@ def __init__(self, corpora, batch_size, bulk_size, ingest_percentage, id_conflic
# use a value > 0 so percent_completed returns a sensible value
self.total_bulks = 1
self.infinite = False
# TBD: obtain value from workload spec.
self.streaming_ingestion = False
self.streaming_ingestion = corpora[0].streaming_ingestion
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the index 0, will streaming ingestion always only have one corpora?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would not help dividing the bandwidth allocated to the download from cloud storage across multiple document sources, rather than optimizing the speed for the single corpus. We could look into this as a later improvement, perhaps.

# pylint: disable = import-outside-toplevel
from osbenchmark.cloud_provider.vendors.s3_data_producer import S3DataProducer
producer = S3DataProducer("big5-corpus", keys, client_options, Slice.data_dir)
bucket = re.sub('^s3://', "", Slice.base_url)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understand that this is still the initial set of changes for the MVP but what are your thoughts on moving the instantiation of the producer outside of the _start_producer()? We could have a separate method dedicated to setting up the producer -- such as _setup_data_producer() -- based on the self.streaming_ingestion detected in the workload. After that, the producer can be passed into _start_producer(). That way, the main workflow will be preserved and we can support other types of data producesr later (e.g. azure, gcp, etc.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would certainly be the right way of modularizing this. For now, the focus was on getting the feature in prior to the 2.0 release deadline, end-to-end.

@gkamat
Copy link
Collaborator Author

gkamat commented Aug 18, 2025

LGTM. @gkamat Could we sync to see how this works E2E?

Yes, let's do that later today.

@IanHoang IanHoang changed the base branch from 2.0-develop to 2.0-beta August 18, 2025 18:09
@IanHoang IanHoang changed the base branch from 2.0-beta to 2.0-develop August 18, 2025 18:10
@IanHoang IanHoang merged commit 11d0108 into opensearch-project:2.0-develop Aug 18, 2025
10 checks passed
gkamat added a commit that referenced this pull request Aug 19, 2025
IanHoang pushed a commit that referenced this pull request Aug 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants