1
- import { type RecordApi } from "trailbase" ;
1
+ import { Store } from "@tanstack/store" ;
2
+ import type { Event , RecordApi } from "trailbase" ;
2
3
3
4
import type { CollectionConfig , SyncConfig , UtilsRecord } from "@tanstack/db" ;
4
- import { Store } from "@tanstack/store" ;
5
5
6
6
/**
7
7
* Configuration interface for TrailbaseCollection
@@ -11,7 +11,7 @@ export interface TrailBaseCollectionConfig<
11
11
TKey extends string | number = string | number ,
12
12
> extends Omit <
13
13
CollectionConfig < TItem , TKey > ,
14
- " sync" | " onInsert" | " onUpdate" | " onDelete"
14
+ ` sync` | ` onInsert` | ` onUpdate` | ` onDelete`
15
15
> {
16
16
/**
17
17
* Record API name
@@ -21,10 +21,8 @@ export interface TrailBaseCollectionConfig<
21
21
22
22
export type AwaitTxIdFn = ( txId : string , timeout ?: number ) => Promise < boolean > ;
23
23
24
- export type RefetchFn = ( ) => Promise < void > ;
25
-
26
24
export interface TrailBaseCollectionUtils extends UtilsRecord {
27
- refetch : RefetchFn ;
25
+ cancel : ( ) => void ;
28
26
}
29
27
30
28
export function trailBaseCollectionOptions < TItem extends object > (
@@ -35,7 +33,7 @@ export function trailBaseCollectionOptions<TItem extends object>(
35
33
const seenIds = new Store ( new Map < string , number > ( ) ) ;
36
34
37
35
const awaitIds = (
38
- ids : string [ ] ,
36
+ ids : Array < string > ,
39
37
timeout : number = 120 * 1000 ,
40
38
) : Promise < void > => {
41
39
const completed = ( value : Map < string , number > ) =>
@@ -67,6 +65,7 @@ export function trailBaseCollectionOptions<TItem extends object>(
67
65
seen . setState ( ( curr ) => {
68
66
const now = Date . now ( ) ;
69
67
let anyExpired = false ;
68
+
70
69
const notExpired = curr . entries ( ) . filter ( ( [ _ , v ] ) => {
71
70
const expired = now - v > 300 * 1000 ;
72
71
anyExpired = anyExpired || expired ;
@@ -84,33 +83,39 @@ export function trailBaseCollectionOptions<TItem extends object>(
84
83
} , 120 * 1000 ) ;
85
84
86
85
type SyncParams = Parameters < SyncConfig < TItem > [ `sync`] > [ 0 ] ;
87
- let syncParams : SyncParams | undefined ;
86
+
87
+ let eventReader : ReadableStreamDefaultReader < Event > | undefined ;
88
88
const sync = {
89
89
sync : ( params : SyncParams ) => {
90
- syncParams = params ;
91
90
const { begin, write, commit } = params ;
92
91
93
92
// Initial fetch.
94
93
async function initialFetch ( ) {
95
- let response = await config . recordApi . list ( { count : true } ) ;
94
+ const limit = 256 ;
95
+ let response = await config . recordApi . list ( {
96
+ pagination : {
97
+ limit,
98
+ } ,
99
+ } ) ;
96
100
let cursor = response . cursor ;
97
101
let got = 0 ;
98
102
99
103
begin ( ) ;
100
104
101
105
while ( true ) {
102
106
const length = response . records . length ;
103
- if ( length === 0 ) {
104
- break ;
105
- }
107
+ if ( length === 0 ) break ;
106
108
107
109
got = got + length ;
108
110
for ( const item of response . records ) {
109
- write ( { type : " insert" , value : item as TItem } ) ;
111
+ write ( { type : ` insert` , value : item } ) ;
110
112
}
111
113
114
+ if ( length < limit ) break ;
115
+
112
116
response = await config . recordApi . list ( {
113
117
pagination : {
118
+ limit,
114
119
cursor,
115
120
offset : cursor === undefined ? got : undefined ,
116
121
} ,
@@ -123,22 +128,29 @@ export function trailBaseCollectionOptions<TItem extends object>(
123
128
124
129
// Afterwards subscribe.
125
130
async function subscribe ( ) {
126
- const eventStream = await config . recordApi . subscribe ( "*" ) ;
131
+ const eventStream = await config . recordApi . subscribe ( `*` ) ;
132
+ const reader = ( eventReader = eventStream . getReader ( ) ) ;
133
+
134
+ while ( true ) {
135
+ const { done, value : event } = await reader . read ( ) ;
127
136
128
- for await ( const event of eventStream ) {
129
- console . debug ( `Event: ${ JSON . stringify ( event ) } ` ) ;
137
+ if ( done || ! event ) {
138
+ reader . releaseLock ( ) ;
139
+ eventReader = undefined ;
140
+ return ;
141
+ }
130
142
131
143
begin ( ) ;
132
144
let value : TItem | undefined ;
133
- if ( " Insert" in event ) {
145
+ if ( ` Insert` in event ) {
134
146
value = event . Insert as TItem ;
135
- write ( { type : " insert" , value } ) ;
136
- } else if ( " Delete" in event ) {
147
+ write ( { type : ` insert` , value } ) ;
148
+ } else if ( ` Delete` in event ) {
137
149
value = event . Delete as TItem ;
138
- write ( { type : " delete" , value } ) ;
139
- } else if ( " Update" in event ) {
150
+ write ( { type : ` delete` , value } ) ;
151
+ } else if ( ` Update` in event ) {
140
152
value = event . Update as TItem ;
141
- write ( { type : " update" , value } ) ;
153
+ write ( { type : ` update` , value } ) ;
142
154
} else {
143
155
console . error ( `Error: ${ event . Error } ` ) ;
144
156
}
@@ -154,9 +166,7 @@ export function trailBaseCollectionOptions<TItem extends object>(
154
166
}
155
167
}
156
168
157
- initialFetch ( ) . then ( ( ) => {
158
- subscribe ( ) ;
159
- } ) ;
169
+ initialFetch ( ) . then ( ( ) => subscribe ( ) ) ;
160
170
} ,
161
171
// Expose the getSyncMetadata function
162
172
getSyncMetadata : undefined ,
@@ -165,11 +175,11 @@ export function trailBaseCollectionOptions<TItem extends object>(
165
175
return {
166
176
sync,
167
177
getKey,
168
- onInsert : async ( params ) : Promise < ( number | string ) [ ] > => {
178
+ onInsert : async ( params ) : Promise < Array < number | string > > => {
169
179
const ids = await config . recordApi . createBulk (
170
180
params . transaction . mutations . map ( ( tx ) => {
171
181
const { type, changes } = tx ;
172
- if ( type !== " insert" ) {
182
+ if ( type !== ` insert` ) {
173
183
throw new Error ( `Expected 'insert', got: ${ type } ` ) ;
174
184
}
175
185
return changes as TItem ;
@@ -184,10 +194,10 @@ export function trailBaseCollectionOptions<TItem extends object>(
184
194
return ids ;
185
195
} ,
186
196
onUpdate : async ( params ) => {
187
- const ids : string [ ] = await Promise . all (
197
+ const ids : Array < string > = await Promise . all (
188
198
params . transaction . mutations . map ( async ( tx ) => {
189
199
const { type, changes, key } = tx ;
190
- if ( type !== " update" ) {
200
+ if ( type !== ` update` ) {
191
201
throw new Error ( `Expected 'update', got: ${ type } ` ) ;
192
202
}
193
203
@@ -202,10 +212,10 @@ export function trailBaseCollectionOptions<TItem extends object>(
202
212
await awaitIds ( ids ) ;
203
213
} ,
204
214
onDelete : async ( params ) => {
205
- const ids : string [ ] = await Promise . all (
215
+ const ids : Array < string > = await Promise . all (
206
216
params . transaction . mutations . map ( async ( tx ) => {
207
217
const { type, key } = tx ;
208
- if ( type !== " delete" ) {
218
+ if ( type !== ` delete` ) {
209
219
throw new Error ( `Expected 'delete', got: ${ type } ` ) ;
210
220
}
211
221
@@ -220,11 +230,12 @@ export function trailBaseCollectionOptions<TItem extends object>(
220
230
await awaitIds ( ids ) ;
221
231
} ,
222
232
utils : {
223
- // NOTE: Refetch shouldn't be necessary, we'll see. It may still be
224
- // necessary if subscriptions gets temporarily disconnected and changes
225
- // get lost.
226
- refetch : async ( ) => {
227
- console . warn ( `Not implemented: refetch` , syncParams ?. collection ) ;
233
+ cancel : ( ) => {
234
+ if ( eventReader ) {
235
+ eventReader . cancel ( ) ;
236
+ eventReader . releaseLock ( ) ;
237
+ eventReader = undefined ;
238
+ }
228
239
} ,
229
240
} ,
230
241
} ;
0 commit comments