Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions pkg/nack/responder_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package nack

import (
"encoding/binary"
"sync"

"github.com/pion/interceptor"
Expand Down Expand Up @@ -62,6 +63,11 @@
type localStream struct {
sendBuffer *sendBuffer
rtpWriter interceptor.RTPWriter

// Non-zero if Retransmissions should be sent on a distinct stream
rtxSsrc uint32
rtxPayloadType uint8
rtxSequencer rtp.Sequencer
}

// NewResponderInterceptor returns a new ResponderInterceptorFactor
Expand Down Expand Up @@ -108,7 +114,13 @@
// error is already checked in NewGeneratorInterceptor
sendBuffer, _ := newSendBuffer(n.size)
n.streamsMu.Lock()
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}
n.streams[info.SSRC] = &localStream{
sendBuffer: sendBuffer,
rtpWriter: writer,
rtxSsrc: info.SSRCRetransmission,
rtxPayloadType: info.PayloadTypeRetransmission,
rtxSequencer: rtp.NewRandomSequencer(),
}
n.streamsMu.Unlock()

return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
Expand Down Expand Up @@ -139,8 +151,43 @@
for i := range nack.Nacks {
nack.Nacks[i].Range(func(seq uint16) bool {
if p := stream.sendBuffer.get(seq); p != nil {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)
if stream.rtxSsrc != 0 {
// Store the original sequence number and rewrite the sequence number.
originalSequenceNumber := p.Header().SequenceNumber
p.Header().SequenceNumber = stream.rtxSequencer.NextSequenceNumber()

// Rewrite the SSRC.
p.Header().SSRC = stream.rtxSsrc
// Rewrite the payload type.
p.Header().PayloadType = stream.rtxPayloadType

// Remove padding if present.
paddingLength := 0
originPayload := p.Payload()
if p.Header().Padding {
paddingLength = int(originPayload[len(originPayload)-1])
p.Header().Padding = false

Check warning on line 169 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L168-L169

Added lines #L168 - L169 were not covered by tests
}

// Write the original sequence number at the beginning of the payload.
payload := make([]byte, 2)
binary.BigEndian.PutUint16(payload, originalSequenceNumber)
payload = append(payload, originPayload[:len(originPayload)-paddingLength]...)

// Send RTX packet.
if _, err := stream.rtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil {
n.log.Warnf("failed sending rtx packet: %+v", err)

Check warning on line 179 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L179

Added line #L179 was not covered by tests
}

// Resore the Padding and SSRC.
if paddingLength > 0 {
p.Header().Padding = true

Check warning on line 184 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L184

Added line #L184 was not covered by tests
}
p.Header().SequenceNumber = originalSequenceNumber
} else {
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
n.log.Warnf("failed resending nacked packet: %+v", err)

Check warning on line 189 in pkg/nack/responder_interceptor.go

View check run for this annotation

Codecov / codecov/patch

pkg/nack/responder_interceptor.go#L189

Added line #L189 was not covered by tests
}
}
p.Release()
}
Expand Down
58 changes: 58 additions & 0 deletions pkg/nack/responder_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package nack

import (
"encoding/binary"
"testing"
"time"

Expand Down Expand Up @@ -231,3 +232,60 @@ func TestResponderInterceptor_StreamFilter(t *testing.T) {
case <-time.After(10 * time.Millisecond):
}
}

func TestResponderInterceptor_RFC4588(t *testing.T) {
f, err := NewResponderInterceptor()
require.NoError(t, err)

i, err := f.NewInterceptor("")
require.NoError(t, err)

stream := test.NewMockStream(&interceptor.StreamInfo{
SSRC: 1,
SSRCRetransmission: 2,
PayloadTypeRetransmission: 2,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
}, i)
defer func() {
require.NoError(t, stream.Close())
}()

for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))

select {
case p := <-stream.WrittenRTP():
require.Equal(t, seqNum, p.SequenceNumber)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}
}

stream.ReceiveRTCP([]rtcp.Packet{
&rtcp.TransportLayerNack{
MediaSSRC: 1,
SenderSSRC: 2,
Nacks: []rtcp.NackPair{
{PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15
},
},
})

// seq number 13 was never sent, so it can't be resent
for _, seqNum := range []uint16{11, 12, 15} {
select {
case p := <-stream.WrittenRTP():
require.Equal(t, uint32(2), p.SSRC)
require.Equal(t, uint8(2), p.PayloadType)
require.Equal(t, binary.BigEndian.Uint16(p.Payload), seqNum)
case <-time.After(10 * time.Millisecond):
t.Fatal("written rtp packet not found")
}
}

select {
case p := <-stream.WrittenRTP():
t.Errorf("no more rtp packets expected, found sequence number: %v", p.SequenceNumber)
case <-time.After(10 * time.Millisecond):
}
}
Loading