Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ require (
github.com/containerd/continuity v0.4.2 // indirect
github.com/crate-crypto/go-ipa v0.0.0-20240223125850-b1e8a79f509c // indirect
github.com/crate-crypto/go-kzg-4844 v1.0.0 // indirect
github.com/dchest/siphash v1.2.3 // indirect
github.com/deckarep/golang-set/v2 v2.6.0 // indirect
github.com/docker/cli v25.0.3+incompatible // indirect
github.com/docker/docker v25.0.6+incompatible // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA=
github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
Expand Down
8 changes: 6 additions & 2 deletions litt/disktable/control_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,18 @@ func (c *controlLoop) expandSegments() error {
}

// Create a new segment.
salt := [16]byte{}
_, err = c.saltShaker.Read(salt[:])
if err != nil {
return fmt.Errorf("failed to read salt: %w", err)
}
newSegment, err := segment.CreateSegment(
c.logger,
c.fatalErrorHandler,
c.highestSegmentIndex+1,
c.segmentDirectories,
now,
c.metadata.GetShardingFactor(),
c.saltShaker.Uint32(),
salt,
c.fsync)
if err != nil {
return err
Expand Down
8 changes: 6 additions & 2 deletions litt/disktable/disk_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,18 @@ func NewDiskTable(
} else {
nextSegmentIndex = highestSegmentIndex + 1
}
salt := [16]byte{}
_, err = config.SaltShaker.Read(salt[:])
if err != nil {
return nil, fmt.Errorf("failed to read salt: %w", err)
}
mutableSegment, err := segment.CreateSegment(
config.Logger,
fatalErrorHandler,
nextSegmentIndex,
segDirs,
config.Clock(),
metadata.GetShardingFactor(),
config.SaltShaker.Uint32(),
salt,
config.Fsync)
if err != nil {
return nil, fmt.Errorf("failed to create mutable segment: %w", err)
Expand Down
105 changes: 87 additions & 18 deletions litt/disktable/segment/metadata_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ import (
)

const (
// The current serialization version. If we ever change how we serialize data, bump this version.
currentSerializationVersion = uint32(0)

// OldHashFunctionSerializationVersion is the serialization version for the old hash function.
OldHashFunctionSerializationVersion = uint32(0)

// CurrentSerializationVersion is current serialization version. If we ever change how we serialize data,
// bump this version.
CurrentSerializationVersion = uint32(1)

// MetadataFileExtension is the file extension for the metadata file.
MetadataFileExtension = ".metadata"
Expand All @@ -22,13 +27,22 @@ const (
// deleted.
MetadataSwapExtension = ".metadata.swap"

// MetadataSize is the size of the metadata file in bytes. This is a constant, so it's convenient to have it here.
// OldMetadataSize is the size of the format 0 metadata file in bytes.
// This is a constant, so it's convenient to have it here.
// - 4 bytes for version
// - 4 bytes for the sharding factor
// - 4 bytes for salt
// - 8 bytes for lastValueTimestamp
// - and 1 byte for sealed.
MetadataSize = 21
OldMetadataSize = 21

// MetadataSize is the size of the metadata file in bytes. This is a constant, so it's convenient to have it here.
// - 4 bytes for version
// - 4 bytes for the sharding factor
// - 16 bytes for salt
// - 8 bytes for lastValueTimestamp
// - and 1 byte for sealed.
MetadataSize = 33
)

// metadataFile contains metadata about a segment. This file contains metadata about the data segment, such as
Expand All @@ -45,8 +59,13 @@ type metadataFile struct {
shardingFactor uint32

// A random number, used to make the sharding hash function hard for an attacker to predict.
// This value is encoded in the file. Note: after the hash function change, this value is
// only used for data written with the old hash function.
legacySalt uint32

// A random byte array, used to make the sharding hash function hard for an attacker to predict.
// This value is encoded in the file.
salt uint32
salt [16]byte

// The time when the last value was written into the segment, in nanoseconds since the epoch. A segment can
// only be deleted when all values within it are expired, and so we only need to keep track of the lastValueTimestamp of
Expand All @@ -68,15 +87,15 @@ type metadataFile struct {
func createMetadataFile(
index uint32,
shardingFactor uint32,
salt uint32,
salt [16]byte,
parentDirectory string) (*metadataFile, error) {

file := &metadataFile{
index: index,
parentDirectory: parentDirectory,
}

file.serializationVersion = currentSerializationVersion
file.serializationVersion = CurrentSerializationVersion
file.shardingFactor = shardingFactor
file.salt = salt
err := file.write()
Expand Down Expand Up @@ -131,7 +150,11 @@ func getMetadataFileIndex(fileName string) (uint32, error) {

// Size returns the size of the metadata file in bytes.
func (m *metadataFile) Size() uint64 {
return MetadataSize
if m.serializationVersion == OldHashFunctionSerializationVersion {
return OldMetadataSize
} else {
return MetadataSize
}
}

// Name returns the file name for this metadata file.
Expand Down Expand Up @@ -166,9 +189,8 @@ func (m *metadataFile) seal(now time.Time) error {
return nil
}

// serialize serializes the metadata file to a byte array.
func (m *metadataFile) serialize() []byte {
data := make([]byte, MetadataSize)
func (m *metadataFile) serializeLegacy() []byte {
data := make([]byte, OldMetadataSize)

// Write the version
binary.BigEndian.PutUint32(data[0:4], m.serializationVersion)
Expand All @@ -177,7 +199,7 @@ func (m *metadataFile) serialize() []byte {
binary.BigEndian.PutUint32(data[4:8], m.shardingFactor)

// Write the salt
binary.BigEndian.PutUint32(data[8:12], m.salt)
binary.BigEndian.PutUint32(data[8:12], m.legacySalt)

// Write the lastValueTimestamp
binary.BigEndian.PutUint64(data[12:20], m.lastValueTimestamp)
Expand All @@ -192,21 +214,68 @@ func (m *metadataFile) serialize() []byte {
return data
}

// serialize serializes the metadata file to a byte array.
func (m *metadataFile) serialize() []byte {
if m.serializationVersion == OldHashFunctionSerializationVersion {
return m.serializeLegacy()
}

data := make([]byte, MetadataSize)

// Write the version
binary.BigEndian.PutUint32(data[0:4], m.serializationVersion)

// Write the sharding factor
binary.BigEndian.PutUint32(data[4:8], m.shardingFactor)

// Write the salt
copy(data[8:24], m.salt[:])

// Write the lastValueTimestamp
binary.BigEndian.PutUint64(data[24:32], m.lastValueTimestamp)

// Write the sealed flag
if m.sealed {
data[32] = 1
} else {
data[32] = 0
}

return data
}

// deserialize deserializes the metadata file from a byte array.
func (m *metadataFile) deserialize(data []byte) error {
if len(data) != MetadataSize {
return fmt.Errorf("metadata file is not the correct size: %d", len(data))
if len(data) < 4 {
return fmt.Errorf("metadata file is not the correct size, expected at least 4 bytes, got %d", len(data))
}

m.serializationVersion = binary.BigEndian.Uint32(data[0:4])
if m.serializationVersion != currentSerializationVersion {
if m.serializationVersion == OldHashFunctionSerializationVersion {
if len(data) != OldMetadataSize {
return fmt.Errorf("metadata file is not the correct size, expected %d, got %d",
OldMetadataSize, len(data))
}

// TODO (cody.littley): delete this after all data is migrated to the new hash function.
m.shardingFactor = binary.BigEndian.Uint32(data[4:8])
m.legacySalt = binary.BigEndian.Uint32(data[8:12])
m.lastValueTimestamp = binary.BigEndian.Uint64(data[12:20])
m.sealed = data[20] == 1
return nil
} else if m.serializationVersion != CurrentSerializationVersion {
return fmt.Errorf("unsupported serialization version: %d", m.serializationVersion)
}

if len(data) != MetadataSize {
return fmt.Errorf("metadata file is not the correct size, expected %d, got %d",
MetadataSize, len(data))
}

m.shardingFactor = binary.BigEndian.Uint32(data[4:8])
m.salt = binary.BigEndian.Uint32(data[8:12])
m.lastValueTimestamp = binary.BigEndian.Uint64(data[12:20])
m.sealed = data[20] == 1
m.salt = [16]byte(data[8:24])
m.lastValueTimestamp = binary.BigEndian.Uint64(data[24:32])
m.sealed = data[32] == 1

return nil
}
Expand Down
20 changes: 12 additions & 8 deletions litt/disktable/segment/metadata_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ func TestUnsealedSerialization(t *testing.T) {

index := rand.Uint32()
shardingFactor := rand.Uint32()
salt := rand.Uint32()
salt := ([16]byte)(rand.Bytes(16))
timestamp := rand.Uint64()
m := &metadataFile{
index: index,
serializationVersion: currentSerializationVersion,
serializationVersion: CurrentSerializationVersion,
shardingFactor: shardingFactor,
salt: salt,
lastValueTimestamp: timestamp,
Expand Down Expand Up @@ -58,11 +58,11 @@ func TestSealedSerialization(t *testing.T) {

index := rand.Uint32()
shardingFactor := rand.Uint32()
salt := rand.Uint32()
salt := ([16]byte)(rand.Bytes(16))
timestamp := rand.Uint64()
m := &metadataFile{
index: index,
serializationVersion: currentSerializationVersion,
serializationVersion: CurrentSerializationVersion,
shardingFactor: shardingFactor,
salt: salt,
lastValueTimestamp: timestamp,
Expand Down Expand Up @@ -99,12 +99,14 @@ func TestFreshFileSerialization(t *testing.T) {
rand := random.NewTestRandom()
directory := t.TempDir()

salt := ([16]byte)(rand.Bytes(16))

index := rand.Uint32()
m, err := createMetadataFile(index, 1234, 5678, directory)
m, err := createMetadataFile(index, 1234, salt, directory)
require.NoError(t, err)

require.Equal(t, index, m.index)
require.Equal(t, currentSerializationVersion, m.serializationVersion)
require.Equal(t, CurrentSerializationVersion, m.serializationVersion)
require.False(t, m.sealed)
require.Zero(t, m.lastValueTimestamp)
require.Equal(t, directory, m.parentDirectory)
Expand Down Expand Up @@ -136,8 +138,10 @@ func TestSealing(t *testing.T) {
rand := random.NewTestRandom()
directory := t.TempDir()

salt := ([16]byte)(rand.Bytes(16))

index := rand.Uint32()
m, err := createMetadataFile(index, 1234, 5678, directory)
m, err := createMetadataFile(index, 1234, salt, directory)
require.NoError(t, err)

// seal the file
Expand All @@ -146,7 +150,7 @@ func TestSealing(t *testing.T) {
require.NoError(t, err)

require.Equal(t, index, m.index)
require.Equal(t, currentSerializationVersion, m.serializationVersion)
require.Equal(t, CurrentSerializationVersion, m.serializationVersion)
require.True(t, m.sealed)
require.Equal(t, uint64(sealTime.UnixNano()), m.lastValueTimestamp)
require.Equal(t, directory, m.parentDirectory)
Expand Down
Loading
Loading