Skip to content
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a2414d8
feat: Added recover-fast cli command
mrkbac May 2, 2025
4f32599
feat: Doctor check ChannelMessageCounts statistic
mrkbac May 2, 2025
2225e12
feat: Improved chunk adding logic
mrkbac May 2, 2025
a7c8f5a
feat: Improved doctor, also validate MessageIndexes
mrkbac May 3, 2025
d04ad0a
feat: Rebuild message index for last chunk
mrkbac May 3, 2025
acd24aa
fix: Corrected linting errors
mrkbac May 3, 2025
9c8a7ae
fix: More linting errors
mrkbac May 3, 2025
5141d8c
fix: More lint errors
mrkbac May 3, 2025
20566c9
fix: One last linting error
mrkbac May 3, 2025
b34bb60
feat: Factored out complex functions from writer.go
mrkbac May 14, 2025
ccdb61c
fix: Rename variable for clarity in Examine function
mrkbac May 14, 2025
2bbebc5
feat: Added always-decode-chunk
mrkbac May 15, 2025
765615a
feat: Replaced `recover-fast` with `recover`
mrkbac May 15, 2025
381705b
fix: Moved recover test to own file
mrkbac May 15, 2025
9ec5b93
feat: Added `recover-in-place` & `info` on unindexed files
mrkbac May 15, 2025
41c0c79
fix: Correctly Statistics calculation in recover
mrkbac May 15, 2025
23bef33
fix: Corrected CRC calculation and update doctor
mrkbac May 16, 2025
9eec585
fix: Improved rebuild_info tests and fixed found errors
mrkbac May 20, 2025
9103727
feat: Restored recover args
mrkbac May 20, 2025
6aa2887
feat: RecoverInPlace tests
mrkbac May 20, 2025
26abec4
Removed changes to info, doctor and recover in-place
mrkbac May 27, 2025
2434e6d
fix: Lint line too long
mrkbac May 27, 2025
412f4cc
fix: Simplified ChecksummingReaderCounter
mrkbac May 28, 2025
0edaab7
fix: Added docs to `AddSchema` and `AddChannel`
mrkbac May 29, 2025
5d5da69
fix: Removed unused function
mrkbac May 29, 2025
d9ee1b3
fix: Removed unused `ChecksummingReaderCounter`
mrkbac May 29, 2025
d3fa265
fix: Write messages in always-decode-chunk
mrkbac May 30, 2025
ca6fe7a
fix: Optimize recordsCopy allocation in recoverRun
mrkbac Jul 2, 2025
7808592
Merge branch 'foxglove:main' into recover-fast
mrkbac Jul 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions go/cli/mcap/cmd/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ type filterFlags struct {
}

type filterOpts struct {
recover bool
output string
includeTopics []regexp.Regexp
excludeTopics []regexp.Regexp
Expand Down Expand Up @@ -224,7 +223,6 @@ func filter(

lexer, err := mcap.NewLexer(r, &mcap.LexerOptions{
ValidateChunkCRCs: true,
EmitInvalidChunks: opts.recover,
AttachmentCallback: func(ar *mcap.AttachmentReader) error {
if !opts.includeAttachments {
return nil
Expand Down Expand Up @@ -260,15 +258,6 @@ func filter(
fmt.Fprintf(os.Stderr, "failed to close mcap writer: %v\n", err)
return
}
if opts.recover {
fmt.Fprintf(
os.Stderr,
"Recovered %d messages, %d attachments, and %d metadata records.\n",
numMessages,
numAttachments,
numMetadata,
)
}
}()

buf := make([]byte, 1024)
Expand All @@ -281,17 +270,6 @@ func filter(
if errors.Is(err, io.EOF) {
return nil
}
if opts.recover {
var expected *mcap.ErrTruncatedRecord
if errors.As(err, &expected) {
fmt.Println(expected.Error())
return nil
}
if token == mcap.TokenInvalidChunk {
fmt.Printf("Invalid chunk encountered, skipping: %s\n", err)
continue
}
}
return err
}
if len(data) > len(buf) {
Expand Down Expand Up @@ -504,39 +482,6 @@ usage:
rootCmd.AddCommand(filterCmd)
}

{
var recoverCmd = &cobra.Command{
Use: "recover [file]",
Short: "Recover data from a potentially corrupt MCAP file",
Long: `This subcommand reads a potentially corrupt MCAP file and copies data to a new file.

usage:
mcap recover in.mcap -o out.mcap`,
}
output := recoverCmd.PersistentFlags().StringP("output", "o", "", "output filename")
chunkSize := recoverCmd.PersistentFlags().Int64P("chunk-size", "", 4*1024*1024, "chunk size of output file")
compression := recoverCmd.PersistentFlags().String(
"compression",
"zstd",
"compression algorithm to use on output file",
)
recoverCmd.Run = func(_ *cobra.Command, args []string) {
filterOptions, err := buildFilterOptions(&filterFlags{
output: *output,
chunkSize: *chunkSize,
outputCompression: *compression,
includeMetadata: true,
includeAttachments: true,
})
if err != nil {
die("configuration error: %s", err)
}
filterOptions.recover = true
run(filterOptions, args)
}
rootCmd.AddCommand(recoverCmd)
}

{
var compressCmd = &cobra.Command{
Use: "compress [file]",
Expand Down
104 changes: 0 additions & 104 deletions go/cli/mcap/cmd/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,110 +257,6 @@ func TestFiltering(t *testing.T) {
}
}

func TestRecover(t *testing.T) {
t.Run("recover data from truncated file", func(t *testing.T) {
writeBuf := bytes.Buffer{}
readBuf := bytes.Buffer{}
writeFilterTestInput(t, &readBuf)
readBuf.Truncate(readBuf.Len() / 2)

require.NoError(t, filter(&readBuf, &writeBuf, &filterOpts{
end: 1000,
recover: true,
includeAttachments: true,
includeMetadata: true,
}))

messageCounter := map[uint16]int{
1: 0,
2: 0,
3: 0,
}
attachmentCounter := 0
metadataCounter := 0
lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{
AttachmentCallback: func(*mcap.AttachmentReader) error {
attachmentCounter++
return nil
},
})
require.NoError(t, err)
defer lexer.Close()
for {
token, record, err := lexer.Next(nil)
if err != nil {
require.ErrorIs(t, err, io.EOF)
break
}
switch token {
case mcap.TokenMessage:
message, err := mcap.ParseMessage(record)
require.NoError(t, err)
messageCounter[message.ChannelID]++
case mcap.TokenMetadata:
metadataCounter++
}
}
assert.Equal(t, 0, attachmentCounter)
assert.Equal(t, 0, metadataCounter)
assert.InDeltaMapValues(t, map[uint16]int{
1: 88,
2: 88,
3: 88,
}, messageCounter, 0.0)
})

t.Run("recover data from chunk with invalid crc", func(t *testing.T) {
writeBuf := bytes.Buffer{}
readBuf := bytes.Buffer{}
writeFilterTestInput(t, &readBuf)
readBuf.Bytes()[0x12b] = 1 // overwrite crc

require.NoError(t, filter(&readBuf, &writeBuf, &filterOpts{
end: 1000,
recover: true,
includeAttachments: true,
includeMetadata: true,
}))
messageCounter := map[uint16]int{
1: 0,
2: 0,
3: 0,
}
attachmentCounter := 0
metadataCounter := 0
lexer, err := mcap.NewLexer(&writeBuf, &mcap.LexerOptions{
AttachmentCallback: func(_ *mcap.AttachmentReader) error {
attachmentCounter++
return nil
},
})
require.NoError(t, err)
for {
token, record, err := lexer.Next(nil)
if err != nil {
require.ErrorIs(t, err, io.EOF)
break
}
switch token {
case mcap.TokenMessage:
message, err := mcap.ParseMessage(record)
require.NoError(t, err)
messageCounter[message.ChannelID]++
case mcap.TokenMetadata:
metadataCounter++
}
}
assert.Equal(t, 1, attachmentCounter)
assert.Equal(t, 1, metadataCounter)
assert.InDeltaMapValues(t, map[uint16]int{
1: 100,
2: 99,
3: 100,
}, messageCounter, 0.0)
})
}

func TestCompileMatchers(t *testing.T) {
matchers, err := compileMatchers([]string{"camera.*", "lights.*"})
require.NoError(t, err)
Expand Down
Loading
Loading