Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,18 @@ public Set<String> listTaskTypes() {
return _pinotHelixTaskResourceManager.getTaskTypes();
}

@GET
@Path("/tasks/summary")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get summary of all tasks across all task types, grouped by tenant. "
+ "Optionally filter by server tenant name to get tasks for a specific tenant only.")
public PinotHelixTaskResourceManager.TaskSummaryResponse getTasksSummary(
@ApiParam(value = "Server tenant name to filter tasks. If not specified, returns all tenants grouped.")
@QueryParam("tenant") @Nullable String tenantName) {
return _pinotHelixTaskResourceManager.getTasksSummary(tenantName);
}

@GET
@Path("/tasks/{taskType}/state")
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.GET_TASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pinot.controller.util.CompletionServiceHelper;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
Expand All @@ -83,6 +84,7 @@ public class PinotHelixTaskResourceManager {
private static final String TASK_QUEUE_PREFIX = "TaskQueue" + TASK_NAME_SEPARATOR;
private static final String TASK_PREFIX = "Task" + TASK_NAME_SEPARATOR;
private static final String UNKNOWN_TABLE_NAME = "unknown";
private static final String UNKNOWN_TENANT_NAME = "unknown";
Comment on lines 86 to +87
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constant UNKNOWN_TENANT_NAME duplicates the value of the existing CommonConstants.UNKNOWN constant (imported at line 65). Consider reusing CommonConstants.UNKNOWN to maintain consistency with the existing constant UNKNOWN_TABLE_NAME on line 86, which uses \"unknown\" but should ideally reference the same shared constant.

Suggested change
private static final String UNKNOWN_TABLE_NAME = "unknown";
private static final String UNKNOWN_TENANT_NAME = "unknown";
private static final String UNKNOWN_TABLE_NAME = CommonConstants.UNKNOWN;

Copilot uses AI. Check for mistakes.

private final TaskDriver _taskDriver;
private final PinotHelixResourceManager _helixResourceManager;
Expand Down Expand Up @@ -958,6 +960,142 @@ private boolean hasTasksForTable(String taskName, String tableNameWithType) {
}
}

/**
* Get the server tenant name for a given table by looking up its configuration.
* Returns "unknown" if the table or tenant cannot be determined.
*
* @param tableName Table name with type (e.g., "myTable_OFFLINE")
* @return Server tenant name or "unknown"
*/
private String getTenantForTable(String tableName) {
if (tableName == null || UNKNOWN_TABLE_NAME.equals(tableName)) {
return UNKNOWN_TENANT_NAME;
}

try {
TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName);
if (tableConfig != null && tableConfig.getTenantConfig() != null) {
String serverTenant = tableConfig.getTenantConfig().getServer();
return serverTenant != null ? serverTenant : UNKNOWN_TENANT_NAME;
}
return UNKNOWN_TENANT_NAME;
} catch (Exception e) {
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching generic Exception is overly broad and may mask unexpected errors. Consider catching more specific exception types that getTableConfig() can throw, or document why a broad catch is necessary here.

Suggested change
} catch (Exception e) {
} catch (IllegalStateException | NullPointerException e) {

Copilot uses AI. Check for mistakes.
LOGGER.warn("Failed to determine tenant for table: {}", tableName, e);
return UNKNOWN_TENANT_NAME;
}
}

/**
* Get a summary of all tasks across all task types, grouped by tenant.
*
* <p>Only includes tasks with RUNNING or WAITING subtasks. Completed, failed, or aborted tasks are excluded.
* Tasks are first resolved to their table, then grouped by the table's server tenant.
*
* @param tenantFilter Optional tenant name to filter results. If null, returns all tenants.
* @return TaskSummaryResponse containing aggregated task counts grouped by tenant
*/
public synchronized TaskSummaryResponse getTasksSummary(@Nullable String tenantFilter) {
TaskSummaryResponse response = new TaskSummaryResponse();
Set<String> taskTypes = getTaskTypes();

if (taskTypes == null || taskTypes.isEmpty()) {
return response;
}

// Map: tenant -> taskType -> aggregated TaskCount
Map<String, Map<String, TaskCount>> tenantToTaskTypeCounts = new TreeMap<>();
int totalRunning = 0;
int totalWaiting = 0;

for (String taskType : taskTypes) {
Map<String, TaskCount> taskCounts = getTaskCounts(taskType);
if (taskCounts == null || taskCounts.isEmpty()) {
continue;
}

// For each parent task, only fetch table breakdown if it has active tasks
for (Map.Entry<String, TaskCount> entry : taskCounts.entrySet()) {
String taskName = entry.getKey();
TaskCount totalTaskCount = entry.getValue();

// Skip if this parent task has no running/waiting tasks
if (totalTaskCount.getRunning() == 0 && totalTaskCount.getWaiting() == 0) {
continue;
}

// Get the table name from the first subtask
// Note: All subtasks in a parent task belong to the same table
List<PinotTaskConfig> subtaskConfigs = getSubtaskConfigs(taskName);
if (subtaskConfigs.isEmpty()) {
continue;
}

PinotTaskConfig firstSubtaskConfig = subtaskConfigs.get(0);
Map<String, String> configs = firstSubtaskConfig.getConfigs();
String tableName = (configs != null)
? configs.getOrDefault(MinionConstants.TABLE_NAME_KEY, UNKNOWN_TABLE_NAME)
: UNKNOWN_TABLE_NAME;

if (UNKNOWN_TABLE_NAME.equals(tableName)) {
continue;
}

// Get tenant for this table
String tenant = getTenantForTable(tableName);

// Apply tenant filter if specified
if (tenantFilter != null && !tenantFilter.equals(tenant)) {
continue;
}

// Accumulate counts for this tenant and task type
tenantToTaskTypeCounts
.computeIfAbsent(tenant, k -> new TreeMap<>())
.computeIfAbsent(taskType, k -> new TaskCount())
.accumulate(totalTaskCount);
}
}

// Build tenant breakdown from aggregated data
List<TenantTaskBreakdown> tenantBreakdowns = new ArrayList<>();
for (Map.Entry<String, Map<String, TaskCount>> tenantEntry : tenantToTaskTypeCounts.entrySet()) {
String tenant = tenantEntry.getKey();
Map<String, TaskCount> taskTypeCounts = tenantEntry.getValue();

int tenantRunning = 0;
int tenantWaiting = 0;
List<TaskTypeBreakdown> taskTypeBreakdowns = new ArrayList<>();

for (Map.Entry<String, TaskCount> taskTypeEntry : taskTypeCounts.entrySet()) {
String taskType = taskTypeEntry.getKey();
TaskCount aggregatedCount = taskTypeEntry.getValue();

int running = aggregatedCount.getRunning();
int waiting = aggregatedCount.getWaiting();

// Only include task types that have running or waiting tasks
if (running > 0 || waiting > 0) {
tenantRunning += running;
tenantWaiting += waiting;
taskTypeBreakdowns.add(new TaskTypeBreakdown(taskType, running, waiting));
}
}

// Only include tenants that have active tasks
if (tenantRunning > 0 || tenantWaiting > 0) {
totalRunning += tenantRunning;
totalWaiting += tenantWaiting;
tenantBreakdowns.add(new TenantTaskBreakdown(tenant, tenantRunning, tenantWaiting, taskTypeBreakdowns));
}
}

response.setTotalRunningTasks(totalRunning);
response.setTotalWaitingTasks(totalWaiting);
response.setTaskBreakdown(tenantBreakdowns);

return response;
}

/**
* Given a taskType, helper method to debug all the HelixJobs for the taskType.
* For each of the HelixJobs, collects status of the (sub)tasks in the taskbatch.
Expand Down Expand Up @@ -1518,4 +1656,177 @@ public void setSubtaskRunningTimes(Map<String, Long> subtaskRunningTimes) {
_subtaskRunningTimes = subtaskRunningTimes;
}
}

/**
* Response model for the {@code GET /tasks/summary} endpoint.
*
* <p>Provides summary information about tasks currently managed by the Pinot cluster, grouped by tenant.
* Only tasks with RUNNING or WAITING subtasks are included; completed, failed, or aborted tasks are excluded.
*
* <p>Fields:
* <ul>
* <li>{@code totalRunningTasks}: Total tasks in RUNNING or INIT state across all tenants</li>
* <li>{@code totalWaitingTasks}: Total tasks in WAITING state (not yet assigned to a worker)</li>
* <li>{@code taskBreakdown}: Task counts grouped by tenant and task type. Tasks with unknown tenant
* configuration appear under tenant name "unknown"</li>
* </ul>
*
* @see TenantTaskBreakdown
* @see TaskTypeBreakdown
*/
@JsonPropertyOrder({"totalRunningTasks", "totalWaitingTasks", "taskBreakdown"})
public static class TaskSummaryResponse {
private int _totalRunningTasks;
private int _totalWaitingTasks;
private List<TenantTaskBreakdown> _taskBreakdown;

public TaskSummaryResponse() {
_totalRunningTasks = 0;
_totalWaitingTasks = 0;
_taskBreakdown = new ArrayList<>();
}

public int getTotalRunningTasks() {
return _totalRunningTasks;
}

public void setTotalRunningTasks(int totalRunningTasks) {
_totalRunningTasks = totalRunningTasks;
}

public int getTotalWaitingTasks() {
return _totalWaitingTasks;
}

public void setTotalWaitingTasks(int totalWaitingTasks) {
_totalWaitingTasks = totalWaitingTasks;
}

public List<TenantTaskBreakdown> getTaskBreakdown() {
return _taskBreakdown;
}

public void setTaskBreakdown(List<TenantTaskBreakdown> taskBreakdown) {
_taskBreakdown = taskBreakdown;
}
}

/**
* Tenant-level breakdown of task counts for the {@code /tasks/summary} API response.
*
* <p>Fields:
* <ul>
* <li>{@code tenant}: Server tenant name from table configuration (or "unknown" if not configured)</li>
* <li>{@code runningTasks}: Total tasks in RUNNING or INIT state for this tenant</li>
* <li>{@code waitingTasks}: Total tasks waiting to be assigned for this tenant</li>
* <li>{@code taskTypeBreakdown}: Running/waiting counts per task type for this tenant</li>
* </ul>
*
* @see TaskSummaryResponse
* @see TaskTypeBreakdown
*/
@JsonPropertyOrder({"tenant", "runningTasks", "waitingTasks", "taskTypeBreakdown"})
public static class TenantTaskBreakdown {
private String _tenant;
private int _runningTasks;
private int _waitingTasks;
private List<TaskTypeBreakdown> _taskTypeBreakdown;

public TenantTaskBreakdown() {
_taskTypeBreakdown = new ArrayList<>();
}

public TenantTaskBreakdown(String tenant, int runningTasks, int waitingTasks,
List<TaskTypeBreakdown> taskTypeBreakdown) {
_tenant = tenant;
_runningTasks = runningTasks;
_waitingTasks = waitingTasks;
_taskTypeBreakdown = taskTypeBreakdown;
}

public String getTenant() {
return _tenant;
}

public void setTenant(String tenant) {
_tenant = tenant;
}

public int getRunningTasks() {
return _runningTasks;
}

public void setRunningTasks(int runningTasks) {
_runningTasks = runningTasks;
}

public int getWaitingTasks() {
return _waitingTasks;
}

public void setWaitingTasks(int waitingTasks) {
_waitingTasks = waitingTasks;
}

public List<TaskTypeBreakdown> getTaskTypeBreakdown() {
return _taskTypeBreakdown;
}

public void setTaskTypeBreakdown(List<TaskTypeBreakdown> taskTypeBreakdown) {
_taskTypeBreakdown = taskTypeBreakdown;
}
}

/**
* Task type breakdown of task counts for the {@code /tasks/summary} API response.
*
* <p>Fields:
* <ul>
* <li>{@code taskType}: Task type name (e.g., "SegmentGenerationAndPushTask", "MergeRollupTask")</li>
* <li>{@code runningCount}: Tasks in RUNNING state</li>
* <li>{@code waitingCount}: Tasks waiting to be scheduled</li>
* </ul>
*
* @see TaskSummaryResponse
* @see TenantTaskBreakdown
*/
@JsonPropertyOrder({"taskType", "runningCount", "waitingCount"})
public static class TaskTypeBreakdown {
private String _taskType;
private int _runningCount;
private int _waitingCount;

public TaskTypeBreakdown() {
}

public TaskTypeBreakdown(String taskType, int runningCount, int waitingCount) {
_taskType = taskType;
_runningCount = runningCount;
_waitingCount = waitingCount;
}

public String getTaskType() {
return _taskType;
}

public void setTaskType(String taskType) {
_taskType = taskType;
}

public int getRunningCount() {
return _runningCount;
}

public void setRunningCount(int runningCount) {
_runningCount = runningCount;
}

public int getWaitingCount() {
return _waitingCount;
}

public void setWaitingCount(int waitingCount) {
_waitingCount = waitingCount;
}
}
}
Loading
Loading