Skip to content
This repository was archived by the owner on Feb 17, 2025. It is now read-only.

Commit 6ac5653

Browse files
committed
proto batch end (#3612)
1 parent 4bc0f59 commit 6ac5653

File tree

7 files changed

+338
-179
lines changed

7 files changed

+338
-179
lines changed

proto/src/proto/datastream/v1/datastream.proto

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@ package datastream.v1;
44

55
option go_package = "github.com/0xPolygonHermez/zkevm-node/state/datastream";
66

7-
message Batch {
7+
message BatchStart {
88
uint64 number = 1;
9-
bytes local_exit_root = 2;
10-
bytes state_root = 3;
119
uint64 fork_id = 4;
1210
uint64 chain_id = 5;
1311
}
1412

13+
message BatchEnd {
14+
uint64 number = 1;
15+
bytes local_exit_root = 2;
16+
bytes state_root = 3;
17+
}
18+
1519
message L2Block {
1620
uint64 number = 1;
1721
uint64 batch_number = 2;
@@ -57,8 +61,9 @@ enum BookmarkType {
5761

5862
enum EntryType {
5963
ENTRY_TYPE_UNSPECIFIED = 0;
60-
ENTRY_TYPE_BATCH = 1;
64+
ENTRY_TYPE_BATCH_START = 1;
6165
ENTRY_TYPE_L2_BLOCK = 2;
6266
ENTRY_TYPE_TRANSACTION = 3;
63-
ENTRY_TYPE_UPDATE_GER = 4;
67+
ENTRY_TYPE_BATCH_END = 4;
68+
ENTRY_TYPE_UPDATE_GER = 5;
6469
}

sequencer/batch.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,8 @@ func (f *finalizer) insertSIPBatch(ctx context.Context, batchNumber uint64, stat
339339

340340
// Send batch bookmark to the datastream
341341
f.DSSendBatchBookmark(batchNumber)
342+
// Send batch start to the datastream
343+
f.DSSendBatchStart(batchNumber)
342344

343345
// Check if synchronizer is up-to-date
344346
//TODO: review if this is needed
@@ -415,7 +417,7 @@ func (f *finalizer) closeSIPBatch(ctx context.Context, dbTx pgx.Tx) error {
415417
}
416418

417419
// Sent batch to DS
418-
f.DSSendBatch(f.wipBatch.batchNumber, f.wipBatch.finalStateRoot, f.wipBatch.finalLocalExitRoot)
420+
f.DSSendBatchEnd(f.wipBatch.batchNumber, f.wipBatch.finalStateRoot, f.wipBatch.finalLocalExitRoot)
419421

420422
log.Infof("sip batch %d closed in statedb, closing reason: %s", f.sipBatch.batchNumber, f.sipBatch.closingReason)
421423

sequencer/datastreamer.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,14 +67,23 @@ func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
6767
}
6868
}
6969

70-
func (f *finalizer) DSSendBatch(batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
70+
func (f *finalizer) DSSendBatchStart(batchNumber uint64) {
7171
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)
7272

7373
if f.streamServer != nil {
74-
// Send batch to the streamer
75-
f.dataToStream <- datastream.Batch{
74+
// Send batch start to the streamer
75+
f.dataToStream <- datastream.BatchStart{
76+
Number: batchNumber,
77+
ForkId: forkID,
78+
}
79+
}
80+
}
81+
82+
func (f *finalizer) DSSendBatchEnd(batchNumber uint64, stateRoot common.Hash, localExitRoot common.Hash) {
83+
if f.streamServer != nil {
84+
// Send batch end to the streamer
85+
f.dataToStream <- datastream.BatchEnd{
7686
Number: batchNumber,
77-
ForkId: forkID,
7887
StateRoot: stateRoot.Bytes(),
7988
LocalExitRoot: localExitRoot.Bytes(),
8089
}

sequencer/sequencer.go

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -409,30 +409,55 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
409409
log.Errorf("failed to commit atomic op for bookmark type %d, value %d, error: %v", data.Type, data.Value, err)
410410
continue
411411
}
412-
case datastream.Batch:
412+
case datastream.BatchStart:
413413
err = s.streamServer.StartAtomicOp()
414414
if err != nil {
415-
log.Errorf("failed to start atomic op for batch, error: %v", err)
415+
log.Errorf("failed to start atomic op for batch start, error: %v", err)
416416
continue
417417
}
418418

419419
data.ChainId = chainID
420420

421-
marshalledBatch, err := proto.Marshal(&data)
421+
marshalledBatchStart, err := proto.Marshal(&data)
422422
if err != nil {
423-
log.Errorf("failed to marshal batch, error: %v", err)
423+
log.Errorf("failed to marshal batch start error: %v", err)
424424
continue
425425
}
426426

427-
_, err = s.streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH), marshalledBatch)
427+
_, err = s.streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_START), marshalledBatchStart)
428428
if err != nil {
429-
log.Errorf("failed to add stream entry for batch, error: %v", err)
429+
log.Errorf("failed to add stream entry for batch start, error: %v", err)
430430
continue
431431
}
432432

433433
err = s.streamServer.CommitAtomicOp()
434434
if err != nil {
435-
log.Errorf("failed to commit atomic op for batch, error: %v", err)
435+
log.Errorf("failed to commit atomic op for batch start, error: %v", err)
436+
continue
437+
}
438+
439+
case datastream.BatchEnd:
440+
err = s.streamServer.StartAtomicOp()
441+
if err != nil {
442+
log.Errorf("failed to start atomic op for batch end, error: %v", err)
443+
continue
444+
}
445+
446+
marshalledBatchEnd, err := proto.Marshal(&data)
447+
if err != nil {
448+
log.Errorf("failed to marshal batch end, error: %v", err)
449+
continue
450+
}
451+
452+
_, err = s.streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END), marshalledBatchEnd)
453+
if err != nil {
454+
log.Errorf("failed to add stream entry for batch end, error: %v", err)
455+
continue
456+
}
457+
458+
err = s.streamServer.CommitAtomicOp()
459+
if err != nil {
460+
log.Errorf("failed to commit atomic op for batch end, error: %v", err)
436461
continue
437462
}
438463

state/datastream.go

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -157,20 +157,34 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
157157
return err
158158
}
159159

160-
genesisBatch := &datastream.Batch{
160+
genesisBatchStart := &datastream.BatchStart{
161+
Number: genesisL2Block.BatchNumber,
162+
ForkId: genesisL2Block.ForkID,
163+
ChainId: chainID,
164+
}
165+
166+
marshalledGenesisBatchStart, err := proto.Marshal(genesisBatchStart)
167+
if err != nil {
168+
return err
169+
}
170+
171+
_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_START), marshalledGenesisBatchStart)
172+
if err != nil {
173+
return err
174+
}
175+
176+
genesisBatchEnd := &datastream.BatchEnd{
161177
Number: genesisL2Block.BatchNumber,
162178
LocalExitRoot: common.Hash{}.Bytes(),
163179
StateRoot: genesisL2Block.StateRoot.Bytes(),
164-
ForkId: genesisL2Block.ForkID,
165-
ChainId: chainID,
166180
}
167181

168-
marshalledGenesisBatch, err := proto.Marshal(genesisBatch)
182+
marshalledGenesisBatchEnd, err := proto.Marshal(genesisBatchEnd)
169183
if err != nil {
170184
return err
171185
}
172186

173-
_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH), marshalledGenesisBatch)
187+
_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END), marshalledGenesisBatchEnd)
174188
if err != nil {
175189
return err
176190
}
@@ -189,15 +203,24 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
189203
log.Infof("Latest entry: %+v", latestEntry)
190204

191205
switch latestEntry.Type {
192-
case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH):
193-
log.Info("Latest entry type is Batch")
206+
case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_START):
207+
log.Info("Latest entry type is Batch Start")
208+
209+
batchStart := &datastream.BatchStart{}
210+
if err := proto.Unmarshal(latestEntry.Data, batchStart); err != nil {
211+
return err
212+
}
213+
214+
currentBatchNumber = batchStart.Number
215+
case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END):
216+
log.Info("Latest entry type is Batch End")
194217

195-
batch := &datastream.Batch{}
196-
if err := proto.Unmarshal(latestEntry.Data, batch); err != nil {
218+
batchEnd := &datastream.BatchStart{}
219+
if err := proto.Unmarshal(latestEntry.Data, batchEnd); err != nil {
197220
return err
198221
}
199222

200-
currentBatchNumber = batch.Number
223+
currentBatchNumber = batchEnd.Number
201224
currentBatchNumber++
202225
case datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_UPDATE_GER):
203226
log.Info("Latest entry type is UpdateGER")
@@ -364,6 +387,22 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
364387
if err != nil {
365388
return err
366389
}
390+
391+
batchStart := &datastream.BatchStart{
392+
Number: batch.BatchNumber,
393+
ForkId: batch.ForkID,
394+
ChainId: chainID,
395+
}
396+
397+
marshalledBatchStart, err := proto.Marshal(batchStart)
398+
if err != nil {
399+
return err
400+
}
401+
402+
_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_START), marshalledBatchStart)
403+
if err != nil {
404+
return err
405+
}
367406
}
368407

369408
if len(batch.L2Blocks) == 0 {
@@ -542,20 +581,18 @@ func GenerateDataStreamFile(ctx context.Context, streamServer *datastreamer.Stre
542581
}
543582
}
544583

545-
batch := &datastream.Batch{
584+
batchEnd := &datastream.BatchEnd{
546585
Number: batch.BatchNumber,
547586
LocalExitRoot: batch.LocalExitRoot.Bytes(),
548587
StateRoot: batch.StateRoot.Bytes(),
549-
ForkId: batch.ForkID,
550-
ChainId: chainID,
551588
}
552589

553-
marshalledBatch, err := proto.Marshal(batch)
590+
marshalledBatch, err := proto.Marshal(batchEnd)
554591
if err != nil {
555592
return err
556593
}
557594

558-
_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH), marshalledBatch)
595+
_, err = streamServer.AddStreamEntry(datastreamer.EntryType(datastream.EntryType_ENTRY_TYPE_BATCH_END), marshalledBatch)
559596
if err != nil {
560597
return err
561598
}

0 commit comments

Comments
 (0)