Skip to content

[watermill-io] Async read implementation causes race conditions due to unsafe bufio.ReadSlice usage #581

@nych

Description

@nych

Meta Information

  • Go Version: go version go1.24.4 linux/amd64
  • Watermill Version: watermill v1.4.6
  • Watermill-io Version watermill-io v1.1.1
  • Operating System: Debian 6.1.140-1
  • Architecture: amd64

Steps to reproduce

The following code snippet is expected to eventually crash, demonstrating the issue in question.

package main

import (
	"context"
	"log"
	"math/rand"

	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-io/pkg/io"
)

type Mock struct{}

// fills the provided byte slice with a single randomly chosen letter ('a', 'b', or 'c'),
// and ends the slice with a newline character.
func (Mock) Read(p []byte) (n int, err error) {
	const letters = "abc"
	letter := letters[rand.Intn(len(letters))]
	p[len(p)-1] = '\n'
	for i := range len(p) - 1 {
		p[i] = letter
	}
	return len(p), nil
}

func (Mock) Close() error { return nil }

func main() {
	sub, err := io.NewSubscriber(
		Mock{},
		io.SubscriberConfig{
			BufferSize:       0,
			MessageDelimiter: '\n',
			UnmarshalFunc:    io.PayloadUnmarshalFunc,
		},
		watermill.NewStdLogger(true, true),
	)
	if err != nil {
		log.Fatal(err)
	}

	lines, err := sub.Subscribe(context.Background(), "")
	if err != nil {
		log.Fatal(err)
	}

	for line := range lines {
		for i := 1; i < len(line.Payload); i++ {
			// should crash at some point
			if line.Payload[0] != line.Payload[i] {
				log.Fatalf("here we go: line[%d](%v) != line[%d](%v)", 0, line.Payload[0], i, line.Payload[i])
			}
		}
		line.Ack()
	}
}

Expected behavior

The message payload should always be a homogeneous slice of bytes, where every byte is identical.

Actual behavior

Due to a data race, the payload slice can eventually become heterogeneous, containing differing byte values instead of a uniform byte.

Possible solution

Using bufio.Reader.ReadBytes instead of bufio.Reader.ReadSlice at pkg/io/subscriber.go:209 should resolve the issue. However, this comes with the overhead of allocating a new slice on each call.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions