Skip to content

Commit 12bb10e

Browse files
authored
Create ClickHouse table and queries for sandbox events complete with batcher (#886)
1 parent d1cdc95 commit 12bb10e

28 files changed

+1264
-36
lines changed

packages/clickhouse/go.mod

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
module github.com/e2b-dev/infra/packages/clickhouse
22

3-
go 1.24.0
3+
go 1.24.3
44

5-
toolchain go1.24.3
6-
7-
require github.com/ClickHouse/clickhouse-go/v2 v2.37.2
5+
require (
6+
github.com/ClickHouse/clickhouse-go/v2 v2.37.2
7+
github.com/google/uuid v1.6.0
8+
)
89

910
require (
1011
github.com/ClickHouse/ch-go v0.66.1 // indirect
1112
github.com/andybalholm/brotli v1.1.1 // indirect
1213
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
1314
github.com/go-faster/city v1.0.1 // indirect
1415
github.com/go-faster/errors v0.7.1 // indirect
15-
github.com/google/uuid v1.6.0 // indirect
1616
github.com/klauspost/compress v1.18.0 // indirect
1717
github.com/kr/pretty v0.3.1 // indirect
1818
github.com/paulmach/orb v0.11.1 // indirect
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
CREATE TABLE sandbox_events_local (
4+
timestamp DateTime64(9) CODEC (Delta, ZSTD(1)),
5+
sandbox_id String CODEC (ZSTD(1)),
6+
sandbox_execution_id String CODEC (ZSTD(1)),
7+
sandbox_template_id String CODEC (ZSTD(1)),
8+
sandbox_build_id String CODEC (ZSTD(1)),
9+
sandbox_team_id UUID CODEC (ZSTD(1)),
10+
event_category LowCardinality(String) CODEC (ZSTD(1)),
11+
event_label LowCardinality(String) CODEC (ZSTD(1)),
12+
event_data Nullable(String) CODEC (ZSTD(1))
13+
) ENGINE = MergeTree
14+
PARTITION BY toDate(timestamp)
15+
ORDER BY (sandbox_id, timestamp)
16+
TTL toDateTime(timestamp) + INTERVAL 7 DAY;
17+
-- +goose StatementEnd
18+
19+
-- +goose Down
20+
-- +goose StatementBegin
21+
DROP TABLE IF EXISTS sandbox_events_local;
22+
-- +goose StatementEnd
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- +goose Up
2+
-- +goose StatementBegin
3+
CREATE TABLE sandbox_events as sandbox_events_local
4+
ENGINE = Distributed('cluster', currentDatabase(), 'sandbox_events_local', xxHash64(sandbox_id));
5+
-- +goose StatementEnd
6+
7+
-- +goose Down
8+
-- +goose StatementBegin
9+
DROP TABLE IF EXISTS sandbox_events;
10+
-- +goose StatementEnd
11+
Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
package batcher
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestBatcherStartStop(t *testing.T) {
11+
b, err := NewBatcher[int](func(batch []int) error { return nil }, BatcherOptions{})
12+
if err != nil {
13+
t.Fatal(err)
14+
}
15+
for i := 0; i < 100; i++ {
16+
if err := b.Start(); err != nil {
17+
t.Fatal(err)
18+
}
19+
if err := b.Stop(); err != nil {
20+
t.Fatal(err)
21+
}
22+
}
23+
}
24+
25+
func TestBatcherPushNotStarted(t *testing.T) {
26+
b, err := NewBatcher[int](func(batch []int) error { return nil }, BatcherOptions{})
27+
if err != nil {
28+
t.Fatal(err)
29+
}
30+
ok, err := b.Push(123)
31+
if err != ErrBatcherNotStarted {
32+
t.Fatalf("expected ErrBatcherNotStarted, got %v", err)
33+
}
34+
if ok {
35+
t.Fatal("expected Push to fail")
36+
}
37+
}
38+
39+
func TestBatcherStopNotStarted(t *testing.T) {
40+
b, err := NewBatcher[int](func(batch []int) error { return nil }, BatcherOptions{})
41+
if err != nil {
42+
t.Fatal(err)
43+
}
44+
if err := b.Stop(); err != ErrBatcherNotStarted {
45+
t.Fatalf("expected ErrBatcherNotStarted, got %v", err)
46+
}
47+
}
48+
49+
func TestBatcherDoubleStop(t *testing.T) {
50+
b, err := NewBatcher[int](func(batch []int) error { return nil }, BatcherOptions{})
51+
if err != nil {
52+
t.Fatal(err)
53+
}
54+
if err := b.Start(); err != nil {
55+
t.Fatal(err)
56+
}
57+
if err := b.Stop(); err != nil {
58+
t.Fatal(err)
59+
}
60+
if err := b.Stop(); err != ErrBatcherNotStarted {
61+
t.Fatalf("expected ErrBatcherNotStarted, got %v", err)
62+
}
63+
}
64+
65+
func TestBatcherDoubleStart(t *testing.T) {
66+
b, err := NewBatcher[int](func(batch []int) error { return nil }, BatcherOptions{})
67+
if err != nil {
68+
t.Fatal(err)
69+
}
70+
if err := b.Start(); err != nil {
71+
t.Fatal(err)
72+
}
73+
if err := b.Start(); err != ErrBatcherAlreadyStarted {
74+
t.Fatalf("expected ErrBatcherAlreadyStarted, got %v", err)
75+
}
76+
}
77+
78+
func TestBatcherPushStop(t *testing.T) {
79+
n := 0
80+
b, err := NewBatcher[int](func(batch []int) error {
81+
n += len(batch)
82+
return nil
83+
}, BatcherOptions{MaxDelay: time.Hour})
84+
if err != nil {
85+
t.Fatal(err)
86+
}
87+
if err := b.Start(); err != nil {
88+
t.Fatal(err)
89+
}
90+
for i := 0; i < 10; i++ {
91+
ok, err := b.Push(i)
92+
if err != nil {
93+
t.Fatal(err)
94+
}
95+
if !ok {
96+
t.Fatalf("cannot add item %d to batch", i)
97+
}
98+
}
99+
if err := b.Stop(); err != nil {
100+
t.Fatal(err)
101+
}
102+
103+
if n != 10 {
104+
t.Fatalf("Unexpected n=%d. Expected 10", n)
105+
}
106+
}
107+
108+
func TestBatcherPushMaxBatchSize(t *testing.T) {
109+
testBatcherPushMaxBatchSize(t, 1, 100)
110+
testBatcherPushMaxBatchSize(t, 10, 100)
111+
testBatcherPushMaxBatchSize(t, 100, 100)
112+
testBatcherPushMaxBatchSize(t, 101, 100)
113+
testBatcherPushMaxBatchSize(t, 1003, 15)
114+
testBatcherPushMaxBatchSize(t, 1033, 17)
115+
}
116+
117+
func TestBatcherPushMaxDelay(t *testing.T) {
118+
testBatcherPushMaxDelay(t, 100, time.Millisecond)
119+
testBatcherPushMaxDelay(t, 205, 10*time.Millisecond)
120+
testBatcherPushMaxDelay(t, 313, 100*time.Millisecond)
121+
}
122+
123+
func TestBatcherConcurrentPush(t *testing.T) {
124+
s := uint32(0)
125+
b, err := NewBatcher[uint32](func(batch []uint32) error {
126+
for _, v := range batch {
127+
atomic.AddUint32(&s, v)
128+
}
129+
return nil
130+
}, BatcherOptions{})
131+
if err != nil {
132+
t.Fatal(err)
133+
}
134+
if err := b.Start(); err != nil {
135+
t.Fatal(err)
136+
}
137+
var wg sync.WaitGroup
138+
ss := uint32(0)
139+
for i := 0; i < 10; i++ {
140+
wg.Add(1)
141+
go func() {
142+
for i := 0; i < 100; i++ {
143+
b.Push(uint32(i))
144+
time.Sleep(time.Millisecond)
145+
atomic.AddUint32(&ss, uint32(i))
146+
}
147+
wg.Done()
148+
}()
149+
}
150+
wg.Wait()
151+
if err := b.Stop(); err != nil {
152+
t.Fatal(err)
153+
}
154+
if s != ss {
155+
t.Fatalf("Unepxected sum %d. Expecting %d", s, ss)
156+
}
157+
}
158+
159+
func TestBatcherQueueSize(t *testing.T) {
160+
ch := make(chan struct{})
161+
n := 0
162+
b, err := NewBatcher[int](func(batch []int) error {
163+
<-ch
164+
n += len(batch)
165+
return nil
166+
}, BatcherOptions{
167+
MaxDelay: time.Hour,
168+
MaxBatchSize: 3,
169+
QueueSize: 10,
170+
})
171+
if err != nil {
172+
t.Fatal(err)
173+
}
174+
if err := b.Start(); err != nil {
175+
t.Fatal(err)
176+
}
177+
for i := 0; i < 3; i++ {
178+
ok, err := b.Push(i)
179+
if err != nil {
180+
t.Fatal(err)
181+
}
182+
if !ok {
183+
t.Fatalf("cannot add item %d to batch", i)
184+
}
185+
}
186+
time.Sleep(time.Millisecond)
187+
for i := 0; i < 10; i++ {
188+
ok, err := b.Push(i)
189+
if err != nil {
190+
t.Fatal(err)
191+
}
192+
if !ok {
193+
t.Fatalf("cannot add item %d to batch", i)
194+
}
195+
}
196+
if b.QueueLen() != b.QueueSize {
197+
t.Fatalf("Unexpected queue size %d. Expecting %d", b.QueueLen(), b.QueueSize)
198+
}
199+
for i := 0; i < 10; i++ {
200+
ok, err := b.Push(123)
201+
if err != nil {
202+
t.Fatal(err)
203+
}
204+
if ok {
205+
t.Fatalf("expecting queue overflow")
206+
}
207+
time.Sleep(time.Millisecond)
208+
}
209+
close(ch)
210+
time.Sleep(time.Millisecond)
211+
for i := 0; i < 5; i++ {
212+
ok, err := b.Push(i)
213+
if err != nil {
214+
t.Fatal(err)
215+
}
216+
if !ok {
217+
t.Fatalf("cannot add item %d to batch", i)
218+
}
219+
}
220+
if err := b.Stop(); err != nil {
221+
t.Fatal(err)
222+
}
223+
224+
if n != 18 {
225+
t.Fatalf("Unexpected number of items passed to batcher func: %d. Expected 18", n)
226+
}
227+
}
228+
229+
func testBatcherPushMaxDelay(t *testing.T, itemsCount int, maxDelay time.Duration) {
230+
lastTime := time.Now()
231+
n := 0
232+
nn := 0
233+
b, err := NewBatcher[int](func(batch []int) error {
234+
if time.Since(lastTime) > maxDelay+10*time.Millisecond {
235+
t.Fatalf("Unexpected delay between batches: %s. Expected no more than %s. itemsCount=%d",
236+
time.Since(lastTime), maxDelay, itemsCount)
237+
}
238+
lastTime = time.Now()
239+
nn += len(batch)
240+
n++
241+
return nil
242+
}, BatcherOptions{
243+
MaxDelay: maxDelay,
244+
MaxBatchSize: 100500,
245+
})
246+
if err != nil {
247+
t.Fatal(err)
248+
}
249+
if err := b.Start(); err != nil {
250+
t.Fatal(err)
251+
}
252+
for i := 0; i < itemsCount; i++ {
253+
ok, err := b.Push(i)
254+
if err != nil {
255+
t.Fatal(err)
256+
}
257+
if !ok {
258+
t.Fatalf("cannot add item %d to batch", i)
259+
}
260+
time.Sleep(time.Millisecond)
261+
}
262+
if err := b.Stop(); err != nil {
263+
t.Fatal(err)
264+
}
265+
266+
batchSize := 1000 * maxDelay.Seconds()
267+
expectedN := int(1.2 * (float64(itemsCount) + batchSize - 1) / batchSize)
268+
if n > expectedN {
269+
t.Fatalf("Unexpected number of batch func calls: %d. Expected no more than %d. itemsCount=%d, maxDelay=%s",
270+
n, expectedN, itemsCount, maxDelay)
271+
}
272+
if itemsCount != nn {
273+
t.Fatalf("Unexpected number of items passed to batcher func: %d. Expected %d. maxDelay=%s", nn, itemsCount, maxDelay)
274+
}
275+
}
276+
277+
func testBatcherPushMaxBatchSize(t *testing.T, itemsCount, batchSize int) {
278+
n := 0
279+
nn := 0
280+
b, err := NewBatcher[int](func(batch []int) error {
281+
if len(batch) > batchSize {
282+
t.Fatalf("Unexpected batch size=%d. Must not exceed %d. itemsCount=%d", len(batch), batchSize, itemsCount)
283+
}
284+
if len(batch) == 0 {
285+
t.Fatalf("Empty batch. itemsCount=%d, batchSize=%d", itemsCount, batchSize)
286+
}
287+
nn += len(batch)
288+
n++
289+
return nil
290+
}, BatcherOptions{
291+
MaxDelay: time.Hour,
292+
MaxBatchSize: batchSize,
293+
})
294+
if err != nil {
295+
t.Fatal(err)
296+
}
297+
if err := b.Start(); err != nil {
298+
t.Fatal(err)
299+
}
300+
for i := 0; i < itemsCount; i++ {
301+
ok, err := b.Push(i)
302+
if err != nil {
303+
t.Fatal(err)
304+
}
305+
if !ok {
306+
t.Fatalf("cannot add item %d to batch", i)
307+
}
308+
}
309+
if err := b.Stop(); err != nil {
310+
t.Fatal(err)
311+
}
312+
313+
expectedN := (itemsCount + batchSize - 1) / batchSize
314+
if n != expectedN {
315+
t.Fatalf("Unexpected number of batcher func calls: %d. Expected %d. itemsCount=%d, batchSize=%d",
316+
n, expectedN, itemsCount, batchSize)
317+
}
318+
if nn != itemsCount {
319+
t.Fatalf("Unexpected number of items in all batches: %d. Expected %d. batchSize=%d", nn, itemsCount, batchSize)
320+
}
321+
}

0 commit comments

Comments
 (0)