Skip to content

Add catalog templates to power cascading compaction #18402

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Aug 14, 2025

💥 New features

  • Persist compaction templates in Druid catalog and reuse them across multiple compaction supervisors
  • Launch MSQ compaction jobs using templatized SQL
  • Run period-based cascading compaction with both MSQ SQL jobs and native compaction tasks
  • Decouple compaction job definition from the desired compaction state using a CompactionStateMatcher
  • Retrieve template definitions using SQL

❓Open questions:

[A] Where should we store each indexing template in the Druid catalog?
a. As a table inside a new schema index_template(currently used in this PR)
b. OR as a table inside the druid schema: Currently used for datasources only
c. OR as a single row inside sys.templates: probably not preferable since the catalog models everything as tables and their properties, but this would be neither.

Note: In all of the above cases, the template is always physically stored as a single row in druid_tableDefs in the metadata store.

[B] How should we specify parameters in MSQ SQL template?
a. Extend SqlParameter for this use case. This currently uses positional params like
SELECT * FROM ? WHERE __time > ?
b. OR Named params (currently used in this PR) like
SELECT * FROM "${dataSource}" WHERE __time > '${startTimestamp}.

Changes

Catalog templates

  • Add a new schema index_template (Should we just use the druid schema itself?)
  • Register a new table definition IndexingTemplateDefn which can currently contain only one property payload
  • Add IndexingTemplateSchema for SQL support to run queries like SELECT * FROM index_template.<template_id>

Compaction supervisor classes

  • Add BatchIndexingJob which may contain either a ClientTaskQuery or a ClientSqlQuery (for MSQ jobs).
  • Add BatchIndexingJobTemplate that can create jobs for a given source and destination
  • Update CompactionSupervisor to create jobs using templates
  • Add CompactionJobQueue to create and submit compaction jobs to the Overlord

Refactor for reuse

  • Move common code from CompactSegments to CompactionSlotManager, CompactionSnapshotBuilder
  • Update CompactionStatus, CompactionStatusTracker and DataSourceCompactibleSegmentIterator

MSQ Compaction

  • Use MSQCompactionJobTemplate to submit MSQ SQL jobs to the Broker
  • Add a SQL with template variables such as ${dataSource} and ${startTimestamp}
  • Do not use the existing SqlParameter since it represents positional params rather than named.
  • ❓ Please suggest if it would be better to extend the SqlParameter for this purpose instead.
✏️ Sample templatized MSQ SQL
REPLACE INTO ${dataSource}
OVERWRITE WHERE __time >= TIMESTAMP '${startTimestamp}' AND __time < TIMESTAMP '${endTimestamp}'
SELECT * FROM ${dataSource}
WHERE __time BETWEEN '${startTimestamp}' AND '${endTimestamp}'
PARTITIONED BY DAY

Cascading compaction

  • Rules divide the segment timeline into compactible intervals
  • Each rule specifies an eligibility period relative to the current time
    • Rule 1: range = [now - p1, +inf)
    • Rule 2: range = [now - p2, now - p1)
    • Rule 3: range = [now - p3, now - p2)
      ...
    • Rule n: range = (-inf, now - pn - 1)
  • Each rule is itself a compaction template which may be of type
    compactInline, compactMsq or compactCatalog
  • First rule in the list includes all future data by default
  • Last rule in the list includes all past data by default

📊 Example: Compact to MONTH-DAY-WEEK-DAY

Cascading rules:

  • Rule 1: Compact upto last 1 day using a catalog template id mmnedplf (DAY granularity)
  • Rule 2: Compact from last 1 day to last 15 days using catalog template hdapacml (WEEK granularity)
  • Rule 3: Compact from last 15 days to last 16 days to DAY granularity using MSQ job
  • Rule 4: Compact all older data to MONTH granularity
Screenshot 2025-08-17 at 12 31 18 PM
✏️ Full supervisor spec
{
  "type": "autocompact",
  "spec": {
    "type": "compactCascade",
    "dataSource": "wiki_fihdbadj",
    "rules": [
      {
        "period": "P1D",
        "template": {
          "type": "compactCatalog",
          "templateId": "mmnedplf"
        }
      },
      {
        "period": "P15D",
        "template": {
          "type": "compactCatalog",
          "templateId": "hdapacml"
        }
      },
      {
        "period": "P16D",
        "template": {
          "type": "compactMsq",
          "sqlTemplate": {
            "query": "REPLACE INTO ${dataSource} OVERWRITE WHERE __time >= TIMESTAMP '${startTimestamp}' AND __time < TIMESTAMP '${endTimestamp}' SELECT * FROM ${dataSource} WHERE __time BETWEEN '${startTimestamp}' AND '${endTimestamp}' PARTITIONED BY DAY",
            "resultFormat": null,
            "header": false,
            "typesHeader": false,
            "sqlTypesHeader": false,
            "context": null,
            "parameters": null
          },
          "targetState": {
            "granularitySpec": {
              "segmentGranularity": "DAY"
            }
          }
        }
      },
      {
        "period": "PT0S",
        "template": {
          "type": "compactInline",
          "targetState": {
            "granularitySpec": {
              "segmentGranularity": "MONTH"
            }
          }
        }
      }
    ]
  },
  "suspended": false
}

List of template types

Type / Class Description
compactInline / InlineCompactionJobTemplate Defines desired compaction state inline such as segmentGranularity, partitionsSpec etc. which can be used in building a CompactionTask. Can be used directly inside a cascading template or stored in the Druid catalog.
compactMsq / MSQCompactionJobTemplate Contains an MSQ SQL with template variables such as ${dataSource}, ${startDate}. Can be used directly inside a cascading template or stored in the Druid catalog.
compactCatalog / CatalogCompactionJobTemplate Delegates job creating to a compactInline or compactMsq template stored in the Druid catalog.
compactCascade / CascadingCompactionJobTemplate Used by compaction supervisor for period-based cascading compaction. Each period can be mapped to a compactInline, compactMsq or compactCatalog template.

Pending changes

  • Docs

Future work

We can have a common BatchIndexingSupervisor which uses templates to create jobs.
It could be implemented by ScheduledBatchSupervisor and CompactionSupervisor.
This change was originally included in this patch but has been left out to keep the changes small.

Release note

TODO


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(
config.getTuningConfig(),
config.getMaxRowsPerSegment(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
DataSourceCompactionConfig.getMaxRowsPerSegment
should be avoided because it has been deprecated.
static PartitionsSpec findPartitionsSpecFromConfig(ClientCompactionTaskQueryTuningConfig tuningConfig)
{
final PartitionsSpec partitionsSpecFromTuningConfig = tuningConfig.getPartitionsSpec();
if (partitionsSpecFromTuningConfig == null) {
final long maxTotalRows = Configs.valueOrDefault(tuningConfig.getMaxTotalRows(), Long.MAX_VALUE);
return new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), maxTotalRows);
final Long maxTotalRows = tuningConfig.getMaxTotalRows();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
ClientCompactionTaskQueryTuningConfig.getMaxTotalRows
should be avoided because it has been deprecated.
final long maxTotalRows = Configs.valueOrDefault(tuningConfig.getMaxTotalRows(), Long.MAX_VALUE);
return new DynamicPartitionsSpec(tuningConfig.getMaxRowsPerSegment(), maxTotalRows);
final Long maxTotalRows = tuningConfig.getMaxTotalRows();
final Integer maxRowsPerSegment = tuningConfig.getMaxRowsPerSegment();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
ClientCompactionTaskQueryTuningConfig.getMaxRowsPerSegment
should be avoided because it has been deprecated.
*/
public List<CompactionJob> createJobs(
DruidInputSource inputSource,
DruidDatasourceDestination destination,

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'destination' is never used.
Copy link
Contributor

@uds5501 uds5501 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a qq -

DateTime previousRuleStartTime = DateTimes.MAX;
for (int i = 0; i < rules.size() - 1; ++i) {
final CompactionRule rule = rules.get(i);
final DateTime ruleStartTime = rule.computeStartTime(currentTime, rules.get(i + 1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: considering the signature of the method computeStartTime(DateTime referenceTime, CompactionRule beforeRule), adding a rules.get(i+1) as beforeRule sounds a bit counter-intuitive.

Does it mean that the list of rules being passed here is meant to be executed in reverse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the beforeRule name is explained in the javadoc of the computeStartTime method.
It comes before the current rule in the segment timeline.
But the rules can be evaluated in any order really, since the time chunks of two rules are mutually exclusive.

The rule with the latest time period comes first in the list since that is how we typically think of compaction,
and also how we define load rules in Druid. It keeps the rule definitions intuitive and straightforward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants