Skip to content

Commit 7c4f0bf

Browse files
committed
Move out utility functions.
1 parent a841596 commit 7c4f0bf

File tree

2 files changed

+158
-132
lines changed

2 files changed

+158
-132
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 12 additions & 131 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { isMongoNetworkTimeoutError, isMongoServerError, mongo } from '@powersync/lib-service-mongodb';
1+
import { mongo } from '@powersync/lib-service-mongodb';
22
import {
33
container,
44
DatabaseConnectionError,
@@ -31,7 +31,7 @@ import {
3131
STANDALONE_CHECKPOINT_ID
3232
} from './MongoRelation.js';
3333
import { ChunkedSnapshotQuery } from './MongoSnapshotQuery.js';
34-
import { CHECKPOINTS_COLLECTION, timestampToDate } from './replication-utils.js';
34+
import { CHECKPOINTS_COLLECTION, rawChangeStreamBatches, timestampToDate } from './replication-utils.js';
3535

3636
export interface ChangeStreamOptions {
3737
connections: MongoManager;
@@ -710,124 +710,23 @@ export class ChangeStream {
710710
}
711711
}
712712

713-
private async *rawChangeStreamBatches(options: {
713+
private rawChangeStreamBatches(options: {
714714
lsn: string | null;
715715
maxAwaitTimeMs?: number;
716716
batchSize?: number;
717717
filters: { $match: any; multipleDatabases: boolean };
718718
signal?: AbortSignal;
719719
}): AsyncIterableIterator<{ eventBatch: mongo.ChangeStreamDocument[]; resumeToken: unknown }> {
720-
const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null;
721-
const startAfter = lastLsn?.timestamp;
722-
const resumeAfter = lastLsn?.resumeToken;
723-
724-
const filters = options.filters;
725-
726-
let fullDocument: 'required' | 'updateLookup';
727-
728-
if (this.usePostImages) {
729-
// 'read_only' or 'auto_configure'
730-
// Configuration happens during snapshot, or when we see new
731-
// collections.
732-
fullDocument = 'required';
733-
} else {
734-
fullDocument = 'updateLookup';
735-
}
736-
737-
const streamOptions: mongo.ChangeStreamOptions = {
738-
showExpandedEvents: true,
739-
fullDocument: fullDocument
740-
};
741-
/**
742-
* Only one of these options can be supplied at a time.
743-
*/
744-
if (resumeAfter) {
745-
streamOptions.resumeAfter = resumeAfter;
746-
} else {
747-
// Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the
748-
// case if we have an old one.
749-
streamOptions.startAtOperationTime = startAfter;
750-
}
751-
752-
const pipeline: mongo.Document[] = [
753-
{
754-
$changeStream: streamOptions
755-
},
756-
{
757-
$match: filters.$match
758-
},
759-
{ $changeStreamSplitLargeEvent: {} }
760-
];
761-
762-
let cursorId: bigint | null = null;
763-
764-
const db = filters.multipleDatabases ? this.client.db('admin') : this.defaultDb;
765-
const maxTimeMS = options.maxAwaitTimeMs ?? this.maxAwaitTimeMS;
766-
const batchSize = options.batchSize ?? this.snapshotChunkLength;
767-
options?.signal?.addEventListener('abort', () => {
768-
if (cursorId != null && cursorId !== 0n) {
769-
// This would result in a CursorKilled error.
770-
db.command({
771-
killCursors: '$cmd.aggregate',
772-
cursors: [cursorId]
773-
});
774-
}
720+
return rawChangeStreamBatches({
721+
client: this.client,
722+
filters: options.filters,
723+
db: options.filters.multipleDatabases ? this.client.db('admin') : this.defaultDb,
724+
batchSize: options.batchSize ?? this.snapshotChunkLength,
725+
maxAwaitTimeMs: options.maxAwaitTimeMs ?? this.maxAwaitTimeMS,
726+
lsn: options.lsn,
727+
usePostImages: this.usePostImages,
728+
signal: options.signal
775729
});
776-
777-
const session = this.client.startSession();
778-
try {
779-
// Step 1: Send the aggregate command to start the change stream
780-
const aggregateResult = await db
781-
.command(
782-
{
783-
aggregate: 1,
784-
pipeline,
785-
cursor: { batchSize }
786-
},
787-
{ session }
788-
)
789-
.catch((e) => {
790-
throw mapChangeStreamError(e);
791-
});
792-
793-
cursorId = BigInt(aggregateResult.cursor.id);
794-
let batch = aggregateResult.cursor.firstBatch;
795-
796-
yield { eventBatch: batch, resumeToken: aggregateResult.cursor.postBatchResumeToken };
797-
798-
// Step 2: Poll using getMore until the cursor is closed
799-
while (cursorId && cursorId !== 0n) {
800-
if (options.signal?.aborted) {
801-
break;
802-
}
803-
const getMoreResult: mongo.Document = await db
804-
.command(
805-
{
806-
getMore: cursorId,
807-
collection: '$cmd.aggregate',
808-
batchSize,
809-
maxTimeMS
810-
},
811-
{ session }
812-
)
813-
.catch((e) => {
814-
throw mapChangeStreamError(e);
815-
});
816-
817-
cursorId = BigInt(getMoreResult.cursor.id);
818-
const nextBatch = getMoreResult.cursor.nextBatch;
819-
820-
yield { eventBatch: nextBatch, resumeToken: getMoreResult.cursor.postBatchResumeToken };
821-
}
822-
} finally {
823-
if (cursorId != null && cursorId !== 0n) {
824-
await db.command({
825-
killCursors: '$cmd.aggregate',
826-
cursors: [cursorId]
827-
});
828-
}
829-
await session.endSession();
830-
}
831730
}
832731

833732
async streamChangesInternal() {
@@ -1128,21 +1027,3 @@ export class ChangeStream {
11281027
}
11291028
}
11301029
}
1131-
1132-
function mapChangeStreamError(e: any) {
1133-
if (isMongoNetworkTimeoutError(e)) {
1134-
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
1135-
// We wrap the error to make it more useful.
1136-
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
1137-
} else if (
1138-
isMongoServerError(e) &&
1139-
e.codeName == 'NoMatchingDocument' &&
1140-
e.errmsg?.includes('post-image was not found')
1141-
) {
1142-
throw new ChangeStreamInvalidatedError(e.errmsg, e);
1143-
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
1144-
throw new ChangeStreamInvalidatedError(e.message, e);
1145-
} else {
1146-
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e);
1147-
}
1148-
}

modules/module-mongodb/src/replication/replication-utils.ts

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
1-
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
1+
import { DatabaseConnectionError, ErrorCode, ServiceError } from '@powersync/lib-services-framework';
22
import { MongoManager } from './MongoManager.js';
33
import { PostImagesOption } from '../types/types.js';
44
import * as bson from 'bson';
5+
import { mongo } from '@powersync/lib-service-mongodb';
6+
import { isMongoNetworkTimeoutError, isMongoServerError } from '@powersync/lib-service-mongodb';
7+
import { ChangeStreamInvalidatedError } from './ChangeStream.js';
8+
import { MongoLSN } from '../common/MongoLSN.js';
59

610
export const CHECKPOINTS_COLLECTION = '_powersync_checkpoints';
711

@@ -91,3 +95,144 @@ export async function checkSourceConfiguration(connectionManager: MongoManager):
9195
export function timestampToDate(timestamp: bson.Timestamp) {
9296
return new Date(timestamp.getHighBitsUnsigned() * 1000);
9397
}
98+
99+
export function mapChangeStreamError(e: any) {
100+
if (isMongoNetworkTimeoutError(e)) {
101+
// This typically has an unhelpful message like "connection 2 to 159.41.94.47:27017 timed out".
102+
// We wrap the error to make it more useful.
103+
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1345, `Timeout while reading MongoDB ChangeStream`, e);
104+
} else if (
105+
isMongoServerError(e) &&
106+
e.codeName == 'NoMatchingDocument' &&
107+
e.errmsg?.includes('post-image was not found')
108+
) {
109+
throw new ChangeStreamInvalidatedError(e.errmsg, e);
110+
} else if (isMongoServerError(e) && e.hasErrorLabel('NonResumableChangeStreamError')) {
111+
throw new ChangeStreamInvalidatedError(e.message, e);
112+
} else {
113+
throw new DatabaseConnectionError(ErrorCode.PSYNC_S1346, `Error reading MongoDB ChangeStream`, e);
114+
}
115+
}
116+
117+
export async function* rawChangeStreamBatches(options: {
118+
client: mongo.MongoClient;
119+
db: mongo.Db;
120+
usePostImages: boolean;
121+
lsn: string | null;
122+
maxAwaitTimeMs?: number;
123+
batchSize?: number;
124+
filters: { $match: any; multipleDatabases: boolean };
125+
signal?: AbortSignal;
126+
}): AsyncIterableIterator<{ eventBatch: mongo.ChangeStreamDocument[]; resumeToken: unknown }> {
127+
const lastLsn = options.lsn ? MongoLSN.fromSerialized(options.lsn) : null;
128+
const startAfter = lastLsn?.timestamp;
129+
const resumeAfter = lastLsn?.resumeToken;
130+
131+
const filters = options.filters;
132+
133+
let fullDocument: 'required' | 'updateLookup';
134+
135+
if (options.usePostImages) {
136+
// 'read_only' or 'auto_configure'
137+
// Configuration happens during snapshot, or when we see new
138+
// collections.
139+
fullDocument = 'required';
140+
} else {
141+
fullDocument = 'updateLookup';
142+
}
143+
144+
const streamOptions: mongo.ChangeStreamOptions = {
145+
showExpandedEvents: true,
146+
fullDocument: fullDocument
147+
};
148+
/**
149+
* Only one of these options can be supplied at a time.
150+
*/
151+
if (resumeAfter) {
152+
streamOptions.resumeAfter = resumeAfter;
153+
} else {
154+
// Legacy: We don't persist lsns without resumeTokens anymore, but we do still handle the
155+
// case if we have an old one.
156+
streamOptions.startAtOperationTime = startAfter;
157+
}
158+
159+
const pipeline: mongo.Document[] = [
160+
{
161+
$changeStream: streamOptions
162+
},
163+
{
164+
$match: filters.$match
165+
},
166+
{ $changeStreamSplitLargeEvent: {} }
167+
];
168+
169+
let cursorId: bigint | null = null;
170+
171+
const db = options.db;
172+
const maxTimeMS = options.maxAwaitTimeMs;
173+
const batchSize = options.batchSize;
174+
options?.signal?.addEventListener('abort', () => {
175+
if (cursorId != null && cursorId !== 0n) {
176+
// This would result in a CursorKilled error.
177+
db.command({
178+
killCursors: '$cmd.aggregate',
179+
cursors: [cursorId]
180+
});
181+
}
182+
});
183+
184+
const session = options.client.startSession();
185+
try {
186+
// Step 1: Send the aggregate command to start the change stream
187+
const aggregateResult = await db
188+
.command(
189+
{
190+
aggregate: 1,
191+
pipeline,
192+
cursor: { batchSize }
193+
},
194+
{ session }
195+
)
196+
.catch((e) => {
197+
throw mapChangeStreamError(e);
198+
});
199+
200+
cursorId = BigInt(aggregateResult.cursor.id);
201+
let batch = aggregateResult.cursor.firstBatch;
202+
203+
yield { eventBatch: batch, resumeToken: aggregateResult.cursor.postBatchResumeToken };
204+
205+
// Step 2: Poll using getMore until the cursor is closed
206+
while (cursorId && cursorId !== 0n) {
207+
if (options.signal?.aborted) {
208+
break;
209+
}
210+
const getMoreResult: mongo.Document = await db
211+
.command(
212+
{
213+
getMore: cursorId,
214+
collection: '$cmd.aggregate',
215+
batchSize,
216+
maxTimeMS
217+
},
218+
{ session }
219+
)
220+
.catch((e) => {
221+
throw mapChangeStreamError(e);
222+
});
223+
224+
cursorId = BigInt(getMoreResult.cursor.id);
225+
const nextBatch = getMoreResult.cursor.nextBatch;
226+
227+
yield { eventBatch: nextBatch, resumeToken: getMoreResult.cursor.postBatchResumeToken };
228+
}
229+
} finally {
230+
if (cursorId != null && cursorId !== 0n) {
231+
await db.command({
232+
killCursors: '$cmd.aggregate',
233+
cursors: [cursorId]
234+
});
235+
}
236+
await session.endSession();
237+
}
238+
}

0 commit comments

Comments
 (0)