Skip to content

[Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level #9671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,20 @@ public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT>

void open();

/** The method is executed by the engine only once. */
/**
* Executes engine setup steps in a fixed, non‑concurrent sequence.
*
* <p>Before the first {@link #run()} invocation, methods are called in this order:
*
* <ol>
* <li>{@link #open()}
* <li>{@link #addSplitsBack(List, int)}
* <li>{@link #registerReader(int)}
* </ol>
*
* <p>{@implNote The engine guarantees this invocation order and ensures there are no
* concurrency issues between these calls.}
*/
void run() throws Exception;

/**
Expand All @@ -63,7 +76,13 @@ public interface SourceSplitEnumerator<SplitT extends SourceSplit, StateT>

void registerReader(int subtaskId);

/** If the source is bounded, checkpoint is not triggered. */
/**
* Used to snapshot the state of the enumerator.
*
* <p><strong>Concurrency Consideration:</strong><br>
* This method and {@link #run()} can be invoked concurrently by different threads.
* Systematically manage shared state access to prevent race conditions.
*/
StateT snapshotState(long checkpointId) throws Exception;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ public void registerReader(int subtaskId) {
*/
@Override
public TiDBSourceCheckpointState snapshotState(long checkpointId) throws Exception {
return new TiDBSourceCheckpointState(shouldEnumerate, pendingSplit);
synchronized (stateLock) {
return new TiDBSourceCheckpointState(shouldEnumerate, pendingSplit);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,14 @@ public void close() throws IOException {
@Override
public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
synchronized (stateLock) {
addPendingSplit(splits, subtaskId);
if (context.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
LOG.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
addPendingSplit(splits, subtaskId);
if (context.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
LOG.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
}
LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,14 @@ public void run() {
public void addSplitsBack(List<DorisSourceSplit> splits, int subtaskId) {
log.debug("Add back splits {} to DorisSourceSplitEnumerator.", splits);
if (!splits.isEmpty()) {
synchronized (stateLock) {
addPendingSplit(splits);
if (context.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
log.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
addPendingSplit(splits);
if (context.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
log.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
}
}
Expand All @@ -137,9 +135,7 @@ public void handleSplitRequest(int subtaskId) {
public void registerReader(int subtaskId) {
log.debug("Register reader {} to DorisSourceSplitEnumerator.", subtaskId);
if (!pendingSplit.isEmpty()) {
synchronized (stateLock) {
assignSplit(Collections.singletonList(subtaskId));
}
assignSplit(Collections.singletonList(subtaskId));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class FileSourceSplitEnumerator
new TreeSet<>(Comparator.comparing(FileSourceSplit::splitId));
private Set<FileSourceSplit> assignedSplit;
private final List<String> filePaths;
private final Object lock = new Object();
private final AtomicInteger assignCount = new AtomicInteger(0);

public FileSourceSplitEnumerator(
Expand All @@ -69,7 +70,9 @@ public void open() {
public void run() {
for (int i = 0; i < context.currentParallelism(); i++) {
LOGGER.info("Assigned splits to reader [{}]", i);
assignSplit(i);
synchronized (lock) {
assignSplit(i);
}
}
}

Expand Down Expand Up @@ -139,7 +142,9 @@ public void registerReader(int subtaskId) {

@Override
public FileSourceState snapshotState(long checkpointId) {
return new FileSourceState(assignedSplit);
synchronized (lock) {
return new FileSourceState(assignedSplit);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class MultipleTableFileSourceSplitEnumerator
private final Set<FileSourceSplit> assignedSplit;
private final Map<String, List<String>> filePathMap;
private final AtomicInteger assignCount = new AtomicInteger(0);
private final Object lock = new Object();

public MultipleTableFileSourceSplitEnumerator(
Context<FileSourceSplit> context,
Expand Down Expand Up @@ -107,7 +108,9 @@ public void registerReader(int subtaskId) {}

@Override
public FileSourceState snapshotState(long checkpointId) {
return new FileSourceState(assignedSplit);
synchronized (lock) {
return new FileSourceState(assignedSplit);
}
}

@Override
Expand Down Expand Up @@ -155,7 +158,9 @@ private static int getSplitOwner(int assignCount, int numReaders) {
public void run() throws Exception {
for (int i = 0; i < context.currentParallelism(); i++) {
log.info("Assigned splits to reader [{}]", i);
assignSplit(i);
synchronized (lock) {
assignSplit(i);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class MultipleTableHiveSourceSplitEnumerator
private final Set<FileSourceSplit> assignedSplit;
private final Map<String, List<String>> filePathMap;
private final AtomicInteger assignCount = new AtomicInteger(0);
private final Object lock = new Object();

public MultipleTableHiveSourceSplitEnumerator(
SourceSplitEnumerator.Context<FileSourceSplit> context,
Expand Down Expand Up @@ -108,7 +109,9 @@ public void registerReader(int subtaskId) {}

@Override
public FileSourceState snapshotState(long checkpointId) {
return new FileSourceState(assignedSplit);
synchronized (lock) {
return new FileSourceState(assignedSplit);
}
}

@Override
Expand Down Expand Up @@ -156,7 +159,9 @@ private static int getSplitOwner(int assignCount, int numReaders) {
public void run() throws Exception {
for (int i = 0; i < context.currentParallelism(); i++) {
log.info("Assigned splits to reader [{}]", i);
assignSplit(i);
synchronized (lock) {
assignSplit(i);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,14 @@ public void open() {
@Override
public void addSplitsBack(List<IcebergFileScanTaskSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
synchronized (stateLock) {
addPendingSplits(splits);
if (context.registeredReaders().contains(subtaskId)) {
assignPendingSplits(Collections.singleton(subtaskId));
} else {
log.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
addPendingSplits(splits);
if (context.registeredReaders().contains(subtaskId)) {
assignPendingSplits(Collections.singleton(subtaskId));
} else {
log.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
}
log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size());
Expand All @@ -163,9 +161,7 @@ public void handleSplitRequest(int subtaskId) {}
@Override
public void registerReader(int subtaskId) {
log.debug("Adding reader {} to IcebergSourceEnumerator.", subtaskId);
synchronized (stateLock) {
assignPendingSplits(Collections.singleton(subtaskId));
}
assignPendingSplits(Collections.singleton(subtaskId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,14 @@ public void close() throws IOException {
@Override
public void addSplitsBack(List<JdbcSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
synchronized (stateLock) {
addPendingSplit(splits, subtaskId);
if (context.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
LOG.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
addPendingSplit(splits, subtaskId);
if (context.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
LOG.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
}
LOG.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size());
Expand All @@ -134,9 +132,7 @@ public void handleSplitRequest(int subtaskId) {
public void registerReader(int subtaskId) {
LOG.info("Register reader {} to JdbcSourceSplitEnumerator.", subtaskId);
if (!pendingSplits.isEmpty()) {
synchronized (stateLock) {
assignSplit(Collections.singletonList(subtaskId));
}
assignSplit(Collections.singletonList(subtaskId));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class KafkaSourceSplitEnumerator
private ScheduledExecutorService executor;
private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialized;

private final Object lock = new Object();
private final Map<String, TablePath> topicMappingTablePathMap = new HashMap<>();

private boolean isStreamingMode;
Expand Down Expand Up @@ -145,9 +145,13 @@ public void open() {

@Override
public void run() throws ExecutionException, InterruptedException {
fetchPendingPartitionSplit();
setPartitionStartOffset();
assignSplit();
synchronized (lock) {
fetchPendingPartitionSplit();
setPartitionStartOffset();
}
synchronized (lock) {
assignSplit();
}
if (!initialized) {
initialized = true;
}
Expand Down Expand Up @@ -288,7 +292,9 @@ public void registerReader(int subtaskId) {

@Override
public KafkaSourceState snapshotState(long checkpointId) throws Exception {
return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
synchronized (lock) {
return new KafkaSourceState(new HashSet<>(assignedSplit.values()));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,14 @@ public void close() throws IOException {
@Override
public void addSplitsBack(List<KuduSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
synchronized (stateLock) {
addPendingSplit(splits, subtaskId);
if (enumeratorContext.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
log.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
addPendingSplit(splits, subtaskId);
if (enumeratorContext.registeredReaders().contains(subtaskId)) {
assignSplit(Collections.singletonList(subtaskId));
} else {
log.warn(
"Reader {} is not registered. Pending splits {} are not assigned.",
subtaskId,
splits);
}
}
log.info("Add back splits {} to JdbcSourceSplitEnumerator.", splits.size());
Expand Down Expand Up @@ -192,10 +190,8 @@ public void handleSplitRequest(int subtaskId) {
@Override
public void registerReader(int subtaskId) {
log.debug("Register reader {} to KuduSourceSplitEnumerator.", subtaskId);
synchronized (stateLock) {
if (!pendingSplits.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}
if (!pendingSplits.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class MaxcomputeSourceSplitEnumerator
private Set<MaxcomputeSourceSplit> assignedSplits;
private final ReadonlyConfig readonlyConfig;
private final Map<TablePath, SourceTableInfo> sourceTableInfos;
private final Object stateLock = new Object();

public MaxcomputeSourceSplitEnumerator(
SourceSplitEnumerator.Context<MaxcomputeSourceSplit> enumeratorContext,
Expand All @@ -70,8 +71,12 @@ public void open() {}

@Override
public void run() throws Exception {
discoverySplits();
assignPendingSplits();
synchronized (stateLock) {
discoverySplits();
}
synchronized (stateLock) {
assignPendingSplits();
}
}

@Override
Expand All @@ -92,7 +97,9 @@ public void registerReader(int subtaskId) {}

@Override
public MaxcomputeSourceState snapshotState(long checkpointId) {
return new MaxcomputeSourceState(assignedSplits);
synchronized (stateLock) {
return new MaxcomputeSourceState(assignedSplits);
}
}

@Override
Expand Down
Loading