-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Component(s)
exporter/exporterhelper
Describe the issue you're reporting
Current Status
Currently the exporter batcher does not support "metadata" batching (which is supported by the batch processor), and it only has one active batch at any moment.
The current implementation that only supports one active batch has throughput issues, because the collector cannot linearly scale, since batching the request requires exclusive locks only one goroutine can "batch" at any point.
To solve the linearly scale problem, we need to get to be able to consume in parallel from the "queue" multiple requests while constructing multiple batches at the same time. This naturally happens when using metadata batching, but even that will suffer sometimes because of hot partitions that will not allow multiple updates at the same time.
In the current design where we always have a queue (even for the sync requests that wait for the response, we still have a logical queue there) all implementations are suffering from this problem, since batching is "single threaded"
Proposal
In this proposal, we will use the following terminologies:
- A partition represents the logical separation of the data based on the input (e.g. a key in the context or an entry in the resource attributes). Even when no partitioning is configured (the current state), we can consider that we always have one partition.
- A partition may be split into multiple shards and multiple batches may be produced for the same partition when the incoming load is larger than what a single goroutine can progress.
Implementation details
In order to implement the sharding capability of a partition, the queue batching needs to keep some statistics:
- Number of "blocked in the queue" requests.
- Number of requests processed per each partition.
There will be a "sharder" task that will support dynamic sharding using two actions:
- Split a partition into multiple shards.
- Merge shards from a partition into fewer shards.
The sharder task will be executed periodically (e.g. every minute) and based on the number of "blocked" requests and the traffic pattern from last N minutes per partition will trigger split and/or merge requests to different partitions.
The consumer goroutines that consume requests from the "queue" (as explained, we always have a queue, may or may not wait for the response):
- Pick one request and determine the right partition for that request based on the configuration (using metadata from context, or any other supported mechanism).
- If the determined partition has multiple shards, use a round-robin mechanism to select the shard in which the incoming request goes.