Skip to content

Conversation

gouhongshen
Copy link
Contributor

@gouhongshen gouhongshen commented Sep 5, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue ##15166

What this PR does / why we need it:

  1. deferred the marshaling of the log entry until the async group commits.
  2. Clean some code.

This greatly improved the insertion performance. The sysbench-1000w-100t insertion is almost up 50%.


overview:
	 ---------------------------------------------
	| txn op --> pre wal --> on wal --> post wal |
	---------------------------------------------


details:
			            [------ on wal --------------]
			pre wal ==> parallel marshal ==> flush wal        \\
																	==> apply ==> done apply(commit/rollback) ==> push logtail
			                             ==> collect logtail  //
	                                         (wait marshal)			   [---------- post wal (wait flush) --------------------]

PR Type

Enhancement


Description

  • Deferred marshaling of log entries until async group commits

  • Added ApproxSize() interface to estimate payload sizes

  • Refactored transaction processing pipeline with parallel WAL operations

  • Improved logtail collection with concurrent processing


Diagram Walkthrough

flowchart LR
  A["Transaction Op"] --> B["Pre WAL"]
  B --> C["Parallel Marshal"]
  B --> D["Collect Logtail"]
  C --> E["Flush WAL"]
  D --> F["Wait Marshal"]
  E --> G["Apply"]
  F --> G
  G --> H["Done Apply"]
  H --> I["Push Logtail"]
Loading

File Walkthrough

Relevant files
Enhancement
25 files
basemvccnode.go
Add ApproxSize method to BaseNode interface                           
+13/-0   
command.go
Implement ApproxSize for EntryCommand                                       
+15/-0   
dbmvccnode.go
Add ApproxSize implementations for DB nodes                           
+17/-0   
metamvccnode.go
Implement ApproxSize for metadata nodes                                   
+17/-0   
schema.go
Add ApproxSize method to Schema and ColDef                             
+43/-0   
tablemvccnode.go
Implement ApproxSize for table MVCC nodes                               
+8/-0     
cmd.go
Add ApproxSize method to TxnCmd interface                               
+1/-0     
consts.go
Update trace constants for new pipeline                                   
+4/-5     
memo.go
Add ApproxSize method to TxnMemo                                                 
+11/-0   
types.go
Update transaction interfaces for new pipeline                     
+12/-3   
store.go
Execute group WAL pre-callbacks before prepare                     
+5/-0     
commit.go
Use ApproxSize and add entry count limit                                 
+8/-3     
logentry.go
Defer marshaling with pre-callbacks and ApproxSize             
+30/-14 
base.go
Add pre-callback system and ApproxSize support                     
+38/-8   
types.go
Add callback and size estimation interfaces                           
+6/-0     
mgr.go
Refactor with concurrent logtail collection                           
+60/-57 
tree.go
Implement ApproxSize for Tree structures                                 
+23/-0   
command.go
Add ApproxSize and remove cmd buffer limits                           
+44/-30 
store.go
Add event management methods to NoopTxnStore                         
+5/-0     
txn.go
Rename methods for new async pattern                                         
+3/-3     
txnctx.go
Add ApproxSize method to TxnCtx                                                   
+14/-0   
txnmgr.go
Refactor transaction pipeline with parallel processing     
+148/-130
appendcmd.go
Implement ApproxSize for AppendCmd                                             
+8/-0     
cmdmgr.go
Defer marshaling with callback system                                       
+61/-38 
store.go
Replace wait groups with event management                               
+41/-17 
Configuration changes
1 files
options.go
Increase default client buffer size                                           
+1/-1     
Additional files
11 files
driver.go +1/-0     
entry_test.go +7/-2     
faker.go +6/-2     
group_committer.go +15/-6   
replay_test.go +8/-4     
v1.go +12/-2   
flushTableTail.go +4/-0     
mergeobjectscmd.go +4/-0     
cmd.go +10/-0   
command_test.go +0/-45   
txndb.go +1/-1     

@qodo-merge-pro qodo-merge-pro bot changed the title improve: deferred the marshaling of the log entry until the async group commits and cleaned some code. improve: deferred the marshaling of the log entry until the async group commits and cleaned some code. Sep 5, 2025
Copy link

qodo-merge-pro bot commented Sep 5, 2025

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Possible Issue

In onTxnLogTails, the closure captures loop variable 'i' and assigns mgr.orderedList[i] inside a goroutine. Although i is used as an index for each item, ensure that 'i' is not modified before goroutine execution and that orderedList is sized appropriately to avoid index races or nil gaps when fewer items than capacity are processed.

func (mgr *Manager) onTxnLogTails(items ...any) {
	for i, item := range items {
		txn := item.(txnif.AsyncTxn)
		if txn.IsReplay() {
			continue
		}

		mgr.collectWg.Add(1)

		mgr.collectPool.Submit(func() {
			defer func() {
				mgr.collectWg.Done()
			}()

			txn.GetStore().WaitEvent(txnif.WalPreparing)

			builder := NewTxnLogtailRespBuilder(mgr.rt)
			entries, closeCB := builder.CollectLogtail(txn)

			txn.GetStore().DoneEvent(txnif.TailCollecting)

			txnTail := &txnWithLogtails{
				txn:     txn,
				tails:   entries,
				closeCB: closeCB,
			}

			mgr.orderedList[i] = txnTail

			state := txn.GetTxnState(true)
			if state != txnif.TxnStateCommitted {
				if state != txnif.TxnStateRollbacked {
					panic(fmt.Sprintf("wrong state %v", state))
				}
				return
			}
		})
	}

	mgr.collectWg.Wait()
	for i := range len(items) {
		if mgr.orderedList[i] != nil {
			mgr.generateLogtailWithTxn(mgr.orderedList[i])
			mgr.orderedList[i] = nil
		}
Error Handling

Finish now returns (LogEntry, error) and executes pre-WAL callbacks per entry. Callers like group_committer.Commit and tests were updated, but verify all other call sites handle errors, and that RegisterGroupWalPreCallbacks failures propagate and abort commit cleanly without leaving inconsistent state.

func (w *LogEntryWriter) Finish() (LogEntry, error) {
	for _, e := range w.entries {
		if err := e.Entry.ExecuteGroupWalPreCallbacks(); err != nil {
			return nil, err
		}

		w.buf.Reset()
		if _, err := e.WriteTo(&w.buf); err != nil {
			panic(err)
		}

		eBuf := w.buf.Bytes()
		w.Append(eBuf)
	}

	w.Entry.SetFooter(w.Footer)
	return w.Entry, nil
}
Concurrency Logic

ApplyTxnRecord defers marshaling via a pre-WAL callback and signals DoneEvent(WalPreparing) inside that callback. Ensure event ordering with TailCollecting and WAL flush is correct and that DoneEvent executes even if MarshalBinary fails, to avoid deadlocks waiting on events.

func (mgr *commandManager) ApplyTxnRecord(store *txnStore) (logEntry entry.Entry, err error) {
	if mgr.driver == nil {
		return
	}

	mgr.cmd.SetTxn(store.txn)

	store.AddEvent(txnif.WalPreparing)

	logEntry = entry.GetBase()
	info := &entry.Info{Group: wal.GroupUserTxn}
	logEntry.SetInfo(info)
	logEntry.SetApproxPayloadSize(mgr.cmd.ApproxSize())

	logEntry.RegisterGroupWalPreCallbacks(func() error {
		defer func() {
			store.DoneEvent(txnif.WalPreparing)
		}()

		var (
			err2 error
			buf  []byte
			t1   = time.Now()
		)

		if buf, err2 = mgr.cmd.MarshalBinary(); err2 != nil {
			return err2
		}

		logEntry.SetType(IOET_WALEntry_TxnRecord)
		if err2 = logEntry.SetPayload(buf); err2 != nil {
			return err2
		}

		logutil.Debugf("Marshal Command LSN=%d, Size=%d", info.GroupLSN, len(buf))

		if len(buf) > 10*mpool.MB {
			logutil.Info(
				"BIG-TXN",
				zap.Int("wal-size", len(buf)),
				zap.Uint64("lsn", info.GroupLSN),
				zap.String("txn", store.txn.String()),
			)
		}

		if dur := time.Since(t1); dur >= time.Millisecond*500 {
			logutil.Warn(
				"SlOW-LOG",
				zap.Int("wal-size", len(buf)),
				zap.Uint64("lsn", mgr.lsn),
				zap.String("txn", store.txn.String()),
				zap.Duration("marshal-log-entry-duration", dur),
			)
		}

		return nil
	})

	t2 := time.Now()

Copy link

qodo-merge-pro bot commented Sep 5, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Fix goroutine loop variable capture

In logtail.Manager.onTxnLogTails, the goroutines close over the loop variables i
and txn, which are reused across iterations, causing incorrect indexing and
mixing of transactions under concurrency. Capture per-iteration copies (e.g.,
idx := i; t := txn) and use those inside the submitted function to avoid racey
behavior and mis-ordered logtail generation. This fix is critical to ensure
tails are collected and applied for the intended transaction in the correct
order.

Examples:

pkg/vm/engine/tae/logtail/mgr.go [128-173]
	for i, item := range items {
		txn := item.(txnif.AsyncTxn)
		if txn.IsReplay() {
			continue
		}

		mgr.collectWg.Add(1)

		mgr.collectPool.Submit(func() {
			defer func() {

 ... (clipped 36 lines)

Solution Walkthrough:

Before:

func (mgr *Manager) onTxnLogTails(items ...any) {
	for i, item := range items {
		txn := item.(txnif.AsyncTxn)
		if txn.IsReplay() {
			continue
		}

		mgr.collectWg.Add(1)
		mgr.collectPool.Submit(func() { // BUG: Capturing loop variables i and txn
			defer mgr.collectWg.Done()
			// ... uses txn ...
			builder := NewTxnLogtailRespBuilder(mgr.rt)
			entries, closeCB := builder.CollectLogtail(txn)
			txnTail := &txnWithLogtails{ txn: txn, ... }
			mgr.orderedList[i] = txnTail // BUG: uses i
		})
	}
	// ...
}

After:

func (mgr *Manager) onTxnLogTails(items ...any) {
	for i, item := range items {
		// Create local copies for the closure
		idx := i
		txn := item.(txnif.AsyncTxn)

		if txn.IsReplay() {
			continue
		}

		mgr.collectWg.Add(1)
		mgr.collectPool.Submit(func() { // FIX: Uses local copies
			defer mgr.collectWg.Done()
			// ... uses txn ...
			builder := NewTxnLogtailRespBuilder(mgr.rt)
			entries, closeCB := builder.CollectLogtail(txn)
			txnTail := &txnWithLogtails{ txn: txn, ... }
			mgr.orderedList[idx] = txnTail // FIX: uses idx
		})
	}
	// ...
}
Suggestion importance[1-10]: 10

__

Why: This suggestion correctly identifies a critical bug where loop variables i and txn are captured by a goroutine, which will lead to data races and incorrect logtail processing.

High
Possible issue
Fix goroutine variable capture issue

The goroutine captures the loop variable i which could lead to race conditions
or incorrect indexing when the goroutine executes after the loop iteration
changes.

pkg/vm/engine/tae/logtail/mgr.go [136-163]

-mgr.collectPool.Submit(func() {
-	defer func() {
-		mgr.collectWg.Done()
-	}()
+mgr.collectPool.Submit(func(index int) func() {
+	return func() {
+		defer func() {
+			mgr.collectWg.Done()
+		}()
 
-	txn.GetStore().WaitEvent(txnif.WalPreparing)
+		txn.GetStore().WaitEvent(txnif.WalPreparing)
 
-	builder := NewTxnLogtailRespBuilder(mgr.rt)
-	entries, closeCB := builder.CollectLogtail(txn)
+		builder := NewTxnLogtailRespBuilder(mgr.rt)
+		entries, closeCB := builder.CollectLogtail(txn)
 
-	txn.GetStore().DoneEvent(txnif.TailCollecting)
+		txn.GetStore().DoneEvent(txnif.TailCollecting)
 
-	txnTail := &txnWithLogtails{
-		txn:     txn,
-		tails:   entries,
-		closeCB: closeCB,
+		txnTail := &txnWithLogtails{
+			txn:     txn,
+			tails:   entries,
+			closeCB: closeCB,
+		}
+
+		mgr.orderedList[index] = txnTail
+		...
 	}
+}(i))
 
-	mgr.orderedList[i] = txnTail
-	...
-})
-

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 9

__

Why: This suggestion correctly identifies a critical bug where the loop variable i is captured by a goroutine, which would lead to a race condition and incorrect behavior.

High
Fix silent error handling in prepareRecord

The function returns 0 when ExecuteGroupWalPreCallbacks fails but doesn't
propagate the error. This silent failure could lead to data corruption or
unexpected behavior.

pkg/vm/engine/tae/logstore/driver/logservicedriver/v1.go [261-284]

-func (r *v1Record) append(e *entry.Entry) {
-	r.entries = append(r.entries, e)
-	//r.V1Meta.addr[e.DSN] = uint64(r.payloadSize)
-	//r.payloadSize += uint64(e.GetSize())
-}
-
-func (r *v1Record) prepareRecord() (size int) {
-	var err error
-
+func (r *v1Record) prepareRecord() (size int, err error) {
 	for i := range r.entries {
 		if err = r.entries[i].Entry.ExecuteGroupWalPreCallbacks(); err != nil {
-			return
+			return 0, err
 		}
 
 		r.V1Meta.addr[r.entries[i].DSN] = uint64(r.payloadSize)
 		r.payloadSize += uint64(r.entries[i].GetSize())
 	}
-	...
+
+	r.payload, err = r.Marshal()
+	if err != nil {
+		return 0, err
+	}
+	return len(r.payload), nil
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that an error from ExecuteGroupWalPreCallbacks is silently ignored, which could lead to data corruption; propagating the error is the correct fix.

Medium
Fix inconsistent error handling patterns

The function panics on WriteTo error but returns an error for
ExecuteGroupWalPreCallbacks. This inconsistent error handling could lead to
unexpected crashes instead of proper error propagation.

pkg/vm/engine/tae/logstore/driver/logservicedriver/logentry.go [162-179]

 func (w *LogEntryWriter) Finish() (LogEntry, error) {
 	for _, e := range w.entries {
 		if err := e.Entry.ExecuteGroupWalPreCallbacks(); err != nil {
 			return nil, err
 		}
 
 		w.buf.Reset()
 		if _, err := e.WriteTo(&w.buf); err != nil {
-			panic(err)
+			return nil, err
 		}
 
 		eBuf := w.buf.Bytes()
 		w.Append(eBuf)
 	}
 
 	w.Entry.SetFooter(w.Footer)
 	return w.Entry, nil
 }
  • Apply / Chat
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly points out inconsistent error handling where one error is returned and another causes a panic, improving code robustness by handling the error gracefully.

Medium
General
Replace hardcoded magic number with constant
Suggestion Impact:The commit replaced the hardcoded 50 with a configurable field (d.config.ClientMaxEntryCount), making the threshold configurable as suggested.

code diff:

-	if len(d.committer.writer.entries) >= 50 {
+	if len(d.committer.writer.entries) >= d.config.ClientMaxEntryCount {
 		d.flushCurrentCommitter()

The hardcoded value of 50 entries should be configurable or derived from a
constant. This magic number makes the code less maintainable and could impact
performance tuning.

pkg/vm/engine/tae/logstore/driver/logservicedriver/commit.go [39-45]

+const MaxEntriesPerCommitter = 50
+
 if int(d.committer.writer.ApproxSize()) > d.config.ClientBufSize {
 	d.flushCurrentCommitter()
 }
 
-if len(d.committer.writer.entries) >= 50 {
+if len(d.committer.writer.entries) >= MaxEntriesPerCommitter {
 	d.flushCurrentCommitter()
 }

[Suggestion processed]

Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies a magic number and proposes replacing it with a constant, which improves code readability and maintainability.

Low
  • Update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement Review effort 4/5 size/L Denotes a PR that changes [500,999] lines
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants