Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@
// BindRemoteStream lets you modify any incoming RTP packets.
// It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (i *Chain) BindRemoteStream(ctx *StreamInfo, reader RTPReader) RTPReader {
func (i *Chain) BindRemoteStream(ctx *StreamInfo, processor RTPProcessor) RTPProcessor {

Check warning on line 56 in chain.go

View check run for this annotation

Codecov / codecov/patch

chain.go#L56

Added line #L56 was not covered by tests
for _, interceptor := range i.interceptors {
reader = interceptor.BindRemoteStream(ctx, reader)
processor = interceptor.BindRemoteStream(ctx, processor)

Check warning on line 58 in chain.go

View check run for this annotation

Codecov / codecov/patch

chain.go#L58

Added line #L58 was not covered by tests
}

return reader
return processor

Check warning on line 61 in chain.go

View check run for this annotation

Codecov / codecov/patch

chain.go#L61

Added line #L61 was not covered by tests
}

// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
Expand Down
18 changes: 14 additions & 4 deletions examples/nack/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
receiveRoutine()
}

func receiveRoutine() {

Check failure on line 30 in examples/nack/main.go

View workflow job for this annotation

GitHub Actions / lint / Go

calculated cyclomatic complexity for function receiveRoutine is 11, max is 10 (cyclop)
serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
if err != nil {
panic(err)
Expand All @@ -54,14 +54,20 @@

// Create the writer just for a single SSRC stream
// this is a callback that is fired everytime a RTP packet is ready to be sent
streamReader := chain.BindRemoteStream(
streamReader := interceptor.RTPReaderFunc(
func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
return len(b), nil, nil
},
)

streamProcessor := chain.BindRemoteStream(
&interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
},
interceptor.RTPReaderFunc(
func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
return len(b), nil, nil
interceptor.RTPProcessorFunc(
func(i int, b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {

Check failure on line 69 in examples/nack/main.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'b' seems to be unused, consider removing or renaming it as _ (revive)
return i, nil, nil
},
),
)
Expand All @@ -74,10 +80,14 @@

log.Println("Received RTP")

if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {

Check failure on line 83 in examples/nack/main.go

View workflow job for this annotation

GitHub Actions / lint / Go

shadow: declaration of "err" shadows declaration at line 76 (govet)
panic(err)
}

if _, _, err = streamProcessor.Process(i, buffer[:i], nil); err != nil {
panic(err)
}

// Set the interceptor wide RTCP Writer
// this is a callback that is fired everytime a RTCP packet is ready to be sent
if !rtcpBound {
Expand Down
16 changes: 15 additions & 1 deletion interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Interceptor interface {
// BindRemoteStream lets you modify any incoming RTP packets.
// It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
BindRemoteStream(info *StreamInfo, reader RTPReader) RTPReader
BindRemoteStream(info *StreamInfo, processor RTPProcessor) RTPProcessor

// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
UnbindRemoteStream(info *StreamInfo)
Expand All @@ -58,6 +58,12 @@ type RTPReader interface {
Read([]byte, Attributes) (int, Attributes, error)
}

// RTPProcessor is used by Interceptor.BindRemoteStream.
type RTPProcessor interface {
// Process a rtp packet
Process(int, []byte, Attributes) (int, Attributes, error)
}

// RTCPWriter is used by Interceptor.BindRTCPWriter.
type RTCPWriter interface {
// Write a batch of rtcp packets
Expand All @@ -76,6 +82,9 @@ type RTPWriterFunc func(header *rtp.Header, payload []byte, attributes Attribute
// RTPReaderFunc is an adapter for RTPReader interface.
type RTPReaderFunc func([]byte, Attributes) (int, Attributes, error)

// RTPProcessorFunc is an adapter for RTPReader interface.
type RTPProcessorFunc func(int, []byte, Attributes) (int, Attributes, error)

// RTCPWriterFunc is an adapter for RTCPWriter interface.
type RTCPWriterFunc func(pkts []rtcp.Packet, attributes Attributes) (int, error)

Expand All @@ -92,6 +101,11 @@ func (f RTPReaderFunc) Read(b []byte, a Attributes) (int, Attributes, error) {
return f(b, a)
}

// Process a rtp packet.
func (f RTPProcessorFunc) Process(i int, b []byte, a Attributes) (int, Attributes, error) {
return f(i, b, a)
}

// Write a batch of rtcp packets.
func (f RTCPWriterFunc) Write(pkts []rtcp.Packet, attributes Attributes) (int, error) {
return f(pkts, attributes)
Expand Down
59 changes: 36 additions & 23 deletions internal/test/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
type MockStream struct {
interceptor interceptor.Interceptor

rtcpReader interceptor.RTCPReader
rtcpWriter interceptor.RTCPWriter
rtpReader interceptor.RTPReader
rtpWriter interceptor.RTPWriter
rtcpReader interceptor.RTCPReader
rtcpWriter interceptor.RTCPWriter
rtpReader interceptor.RTPReader
rtpProcessor interceptor.RTPProcessor
rtpWriter interceptor.RTPWriter

rtcpIn chan []rtcp.Packet
rtpIn chan *rtp.Packet
Expand Down Expand Up @@ -96,24 +97,32 @@
},
),
)
mockStream.rtpReader = i.BindRemoteStream(
info, interceptor.RTPReaderFunc(
func(b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) {
p, ok := <-mockStream.rtpIn
if !ok {
return 0, nil, io.EOF
}
// Bind rtpReader to the remote stream
mockStream.rtpReader = interceptor.RTPReaderFunc(
func(b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) {
p, ok := <-mockStream.rtpIn
if !ok {
return 0, nil, io.EOF
}

marshaled, err := p.Marshal()
if err != nil {
return 0, nil, io.EOF
} else if len(marshaled) > len(b) {
return 0, nil, io.ErrShortBuffer
}
marshaled, err := p.Marshal()
if err != nil {
return 0, nil, io.EOF

Check warning on line 110 in internal/test/mock_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock_stream.go#L110

Added line #L110 was not covered by tests
} else if len(marshaled) > len(b) {
return 0, nil, io.ErrShortBuffer
}

Check warning on line 113 in internal/test/mock_stream.go

View check run for this annotation

Codecov / codecov/patch

internal/test/mock_stream.go#L112-L113

Added lines #L112 - L113 were not covered by tests

copy(b, marshaled)

copy(b, marshaled)
return len(marshaled), attrs, err
},
)

return len(marshaled), attrs, err
// Bind rtpProcessor to process RTP packets and pass them to rtpWriter
mockStream.rtpProcessor = i.BindRemoteStream(
info, interceptor.RTPProcessorFunc(
func(i int, b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) {

Check failure on line 124 in internal/test/mock_stream.go

View workflow job for this annotation

GitHub Actions / lint / Go

unused-parameter: parameter 'b' seems to be unused, consider removing or renaming it as _ (revive)
return i, attrs, nil
},
),
)
Expand Down Expand Up @@ -143,25 +152,29 @@
go func() {
buf := make([]byte, 1500)
for {
i, _, err := mockStream.rtpReader.Read(buf, interceptor.Attributes{})
i, attrs, err := mockStream.rtpReader.Read(buf, interceptor.Attributes{})
if err != nil {
if err.Error() == "attempt to pop while buffering" {
continue
}
if errors.Is(err, io.EOF) {
mockStream.rtpInModified <- RTPWithError{Err: err}
}

return
}

// Process the RTP packet through the interceptor pipeline
_, _, err = mockStream.rtpProcessor.Process(i, buf[:i], attrs)
if err != nil {
continue
}

p := &rtp.Packet{}
if err := p.Unmarshal(buf[:i]); err != nil {
mockStream.rtpInModified <- RTPWithError{Err: err}

return
}

//fmt.Println(p)

Check failure on line 177 in internal/test/mock_stream.go

View workflow job for this annotation

GitHub Actions / lint / Go

commentFormatting: put a space between `//` and comment text (gocritic)
mockStream.rtpInModified <- RTPWithError{Packet: p}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (i *NoOp) UnbindLocalStream(_ *StreamInfo) {}
// BindRemoteStream lets you modify any incoming RTP packets.
// It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (i *NoOp) BindRemoteStream(_ *StreamInfo, reader RTPReader) RTPReader {
return reader
func (i *NoOp) BindRemoteStream(_ *StreamInfo, processor RTPProcessor) RTPProcessor {
return processor
}

// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
Expand Down
15 changes: 3 additions & 12 deletions pkg/gcc/send_side_bwe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,6 @@ type mockTWCCResponder struct {
rtpChan chan []byte
}

func (m *mockTWCCResponder) Read(out []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
pkt := <-m.rtpChan
copy(out, pkt)

return len(pkt), nil, nil
}

func (m *mockTWCCResponder) Write(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
return 0, m.bwe.WriteRTCP(pkts, attributes)
}
Expand Down Expand Up @@ -73,7 +66,6 @@ func TestSendSideBWE(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, twccSender)

twccInboundRTP := twccSender.BindRemoteStream(streamInfo, gccMock.twccResponder)
twccSender.BindRTCPWriter(gccMock.twccResponder)

require.Equal(t, latestBitrate, bwe.GetTargetBitrate())
Expand All @@ -89,13 +81,12 @@ func TestSendSideBWE(t *testing.T) {
if _, err = rtpWriter.Write(&rtp.Header{SSRC: 1, Extensions: []rtp.Extension{}}, rtpPayload, nil); err != nil {
panic(err)
}
if _, _, err = twccInboundRTP.Read(buffer, nil); err != nil {
panic(err)
}
pkt := <-gccMock.twccResponder.rtpChan
copy(buffer, pkt)
}

// Sending a stream with zero loss and no RTT should increase estimate
require.Less(t, latestBitrate, bwe.GetTargetBitrate())
require.LessOrEqual(t, latestBitrate, bwe.GetTargetBitrate())
}

func TestSendSideBWE_ErrorOnWriteRTCPAtClosedState(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/intervalpli/generator_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,17 @@ func (r *GeneratorInterceptor) writePLIs(rtcpWriter interceptor.RTCPWriter, ssrc
// It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (r *GeneratorInterceptor) BindRemoteStream(
info *interceptor.StreamInfo, reader interceptor.RTPReader,
) interceptor.RTPReader {
info *interceptor.StreamInfo, processor interceptor.RTPProcessor,
) interceptor.RTPProcessor {
if !streamSupportPli(info) {
return reader
return processor
}

r.streams.Store(info.SSRC, nil)
// New streams need to receive a PLI as soon as possible.
r.ForcePLI(info.SSRC)

return reader
return processor
}

// UnbindLocalStream is called when the Stream is removed. It can be used to clean up any data related to that track.
Expand Down
9 changes: 5 additions & 4 deletions pkg/jitterbuffer/receiver_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream.
// The returned method will be called once per rtp packet.
func (i *ReceiverInterceptor) BindRemoteStream(
_ *interceptor.StreamInfo, reader interceptor.RTPReader,
) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
_ *interceptor.StreamInfo, processor interceptor.RTPProcessor,
) interceptor.RTPProcessor {
return interceptor.RTPProcessorFunc(func(n int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {

Check failure on line 72 in pkg/jitterbuffer/receiver_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

The line is 123 characters long, which exceeds the maximum of 120 characters. (lll)
buf := make([]byte, len(b))
n, attr, err := reader.Read(buf, a)
copy(buf, b)
n, attr, err := processor.Process(n, buf, a)
if err != nil {
return n, attr, err
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/mock/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
BindRTCPWriterFn func(writer interceptor.RTCPWriter) interceptor.RTCPWriter
BindLocalStreamFn func(i *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter
UnbindLocalStreamFn func(i *interceptor.StreamInfo)
BindRemoteStreamFn func(i *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader
BindRemoteStreamFn func(i *interceptor.StreamInfo, processor interceptor.RTPProcessor) interceptor.RTPProcessor
UnbindRemoteStreamFn func(i *interceptor.StreamInfo)
CloseFn func() error
}
Expand Down Expand Up @@ -59,13 +59,13 @@

// BindRemoteStream implements Interceptor.
func (i *Interceptor) BindRemoteStream(
info *interceptor.StreamInfo, reader interceptor.RTPReader,
) interceptor.RTPReader {
info *interceptor.StreamInfo, processor interceptor.RTPProcessor,
) interceptor.RTPProcessor {
if i.BindRemoteStreamFn != nil {
return i.BindRemoteStreamFn(info, reader)
return i.BindRemoteStreamFn(info, processor)
}

return reader
return processor
}

// UnbindRemoteStream implements Interceptor.
Expand Down Expand Up @@ -99,11 +99,21 @@
ReadFn func([]byte, interceptor.Attributes) (int, interceptor.Attributes, error)
}

// RTPProcessor is a mock RTPProcessor.
type RTPProcessor struct {
ProcessFn func(int, []byte, interceptor.Attributes) (int, interceptor.Attributes, error)
}

// Read implements RTPReader.
func (r *RTPReader) Read(b []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
return r.ReadFn(b, attributes)
}

// Process implements RTPReader.
func (r *RTPProcessor) Process(i int, b []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {

Check failure on line 113 in pkg/mock/interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

The line is 121 characters long, which exceeds the maximum of 120 characters. (lll)
return r.ProcessFn(i, b, attributes)

Check warning on line 114 in pkg/mock/interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/mock/interceptor.go#L113-L114

Added lines #L113 - L114 were not covered by tests
}

// RTCPWriter is a mock RTCPWriter.
type RTCPWriter struct {
WriteFn func([]rtcp.Packet, interceptor.Attributes) (int, error)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mock/interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
//nolint:cyclop
func TestInterceptor(t *testing.T) {
dummyRTPWriter := &RTPWriter{}
dummyRTPReader := &RTPReader{}
dummyRTPProcessor := &RTPProcessor{}
dummyRTCPWriter := &RTCPWriter{}
dummyRTCPReader := &RTCPReader{}
dummyStreamInfo := &interceptor.StreamInfo{}
Expand All @@ -31,7 +31,7 @@ func TestInterceptor(t *testing.T) {
t.Error("Default BindLocalStream should return given writer")
}
testInterceptor.UnbindLocalStream(dummyStreamInfo)
if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPReader) != dummyRTPReader {
if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPProcessor) != dummyRTPProcessor {
t.Error("Default BindRemoteStream should return given reader")
}
testInterceptor.UnbindRemoteStream(dummyStreamInfo)
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestInterceptor(t *testing.T) {
UnbindLocalStreamFn: func(*interceptor.StreamInfo) {
atomic.AddUint32(&cntUnbindLocalStream, 1)
},
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPProcessor) interceptor.RTPProcessor {
atomic.AddUint32(&cntBindRemoteStream, 1)

return reader
Expand All @@ -93,7 +93,7 @@ func TestInterceptor(t *testing.T) {
t.Error("Mocked BindLocalStream should return given writer")
}
testInterceptor.UnbindLocalStream(dummyStreamInfo)
if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPReader) != dummyRTPReader {
if testInterceptor.BindRemoteStream(dummyStreamInfo, dummyRTPProcessor) != dummyRTPProcessor {
t.Error("Mocked BindRemoteStream should return given reader")
}
testInterceptor.UnbindRemoteStream(dummyStreamInfo)
Expand Down
10 changes: 5 additions & 5 deletions pkg/nack/generator_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream.
// The returned method will be called once per rtp packet.
func (n *GeneratorInterceptor) BindRemoteStream(
info *interceptor.StreamInfo, reader interceptor.RTPReader,
) interceptor.RTPReader {
info *interceptor.StreamInfo, processor interceptor.RTPProcessor,
) interceptor.RTPProcessor {
if !n.streamsFilter(info) {
return reader
return processor
}

// error is already checked in NewGeneratorInterceptor
Expand All @@ -100,8 +100,8 @@
n.receiveLogs[info.SSRC] = receiveLog
n.receiveLogsMu.Unlock()

return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(b, a)
return interceptor.RTPProcessorFunc(func(i int, b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {

Check failure on line 103 in pkg/nack/generator_interceptor.go

View workflow job for this annotation

GitHub Actions / lint / Go

The line is 123 characters long, which exceeds the maximum of 120 characters. (lll)
i, attr, err := processor.Process(i, b, a)
if err != nil {
return 0, nil, err
}
Expand Down
Loading
Loading