Skip to content

Commit eec9a6d

Browse files
aggresssSean-Der
authored andcommitted
Add support for RFC 4588
Respect in RTX with distinict SSRC + PayloadType
1 parent 5ce4343 commit eec9a6d

File tree

2 files changed

+108
-3
lines changed

2 files changed

+108
-3
lines changed

pkg/nack/responder_interceptor.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package nack
55

66
import (
7+
"encoding/binary"
78
"sync"
89

910
"github.com/pion/interceptor"
@@ -62,6 +63,11 @@ type ResponderInterceptor struct {
6263
type localStream struct {
6364
sendBuffer *sendBuffer
6465
rtpWriter interceptor.RTPWriter
66+
67+
// Non-zero if Retransmissions should be sent on a distinct stream
68+
rtxSsrc uint32
69+
rtxPayloadType uint8
70+
rtxSequencer rtp.Sequencer
6571
}
6672

6773
// NewResponderInterceptor returns a new ResponderInterceptorFactor
@@ -108,7 +114,13 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
108114
// error is already checked in NewGeneratorInterceptor
109115
sendBuffer, _ := newSendBuffer(n.size)
110116
n.streamsMu.Lock()
111-
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}
117+
n.streams[info.SSRC] = &localStream{
118+
sendBuffer: sendBuffer,
119+
rtpWriter: writer,
120+
rtxSsrc: info.SSRCRetransmission,
121+
rtxPayloadType: info.PayloadTypeRetransmission,
122+
rtxSequencer: rtp.NewRandomSequencer(),
123+
}
112124
n.streamsMu.Unlock()
113125

114126
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
@@ -139,8 +151,43 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
139151
for i := range nack.Nacks {
140152
nack.Nacks[i].Range(func(seq uint16) bool {
141153
if p := stream.sendBuffer.get(seq); p != nil {
142-
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
143-
n.log.Warnf("failed resending nacked packet: %+v", err)
154+
if stream.rtxSsrc != 0 {
155+
// Store the original sequence number and rewrite the sequence number.
156+
originalSequenceNumber := p.Header().SequenceNumber
157+
p.Header().SequenceNumber = stream.rtxSequencer.NextSequenceNumber()
158+
159+
// Rewrite the SSRC.
160+
p.Header().SSRC = stream.rtxSsrc
161+
// Rewrite the payload type.
162+
p.Header().PayloadType = stream.rtxPayloadType
163+
164+
// Remove padding if present.
165+
paddingLength := 0
166+
originPayload := p.Payload()
167+
if p.Header().Padding {
168+
paddingLength = int(originPayload[len(originPayload)-1])
169+
p.Header().Padding = false
170+
}
171+
172+
// Write the original sequence number at the beginning of the payload.
173+
payload := make([]byte, 2)
174+
binary.BigEndian.PutUint16(payload, originalSequenceNumber)
175+
payload = append(payload, originPayload[:len(originPayload)-paddingLength]...)
176+
177+
// Send RTX packet.
178+
if _, err := stream.rtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil {
179+
n.log.Warnf("failed sending rtx packet: %+v", err)
180+
}
181+
182+
// Resore the Padding and SSRC.
183+
if paddingLength > 0 {
184+
p.Header().Padding = true
185+
}
186+
p.Header().SequenceNumber = originalSequenceNumber
187+
} else {
188+
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
189+
n.log.Warnf("failed resending nacked packet: %+v", err)
190+
}
144191
}
145192
p.Release()
146193
}

pkg/nack/responder_interceptor_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package nack
55

66
import (
7+
"encoding/binary"
78
"testing"
89
"time"
910

@@ -231,3 +232,60 @@ func TestResponderInterceptor_StreamFilter(t *testing.T) {
231232
case <-time.After(10 * time.Millisecond):
232233
}
233234
}
235+
236+
func TestResponderInterceptor_RFC4588(t *testing.T) {
237+
f, err := NewResponderInterceptor()
238+
require.NoError(t, err)
239+
240+
i, err := f.NewInterceptor("")
241+
require.NoError(t, err)
242+
243+
stream := test.NewMockStream(&interceptor.StreamInfo{
244+
SSRC: 1,
245+
SSRCRetransmission: 2,
246+
PayloadTypeRetransmission: 2,
247+
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack"}},
248+
}, i)
249+
defer func() {
250+
require.NoError(t, stream.Close())
251+
}()
252+
253+
for _, seqNum := range []uint16{10, 11, 12, 14, 15} {
254+
require.NoError(t, stream.WriteRTP(&rtp.Packet{Header: rtp.Header{SequenceNumber: seqNum}}))
255+
256+
select {
257+
case p := <-stream.WrittenRTP():
258+
require.Equal(t, seqNum, p.SequenceNumber)
259+
case <-time.After(10 * time.Millisecond):
260+
t.Fatal("written rtp packet not found")
261+
}
262+
}
263+
264+
stream.ReceiveRTCP([]rtcp.Packet{
265+
&rtcp.TransportLayerNack{
266+
MediaSSRC: 1,
267+
SenderSSRC: 2,
268+
Nacks: []rtcp.NackPair{
269+
{PacketID: 11, LostPackets: 0b1011}, // sequence numbers: 11, 12, 13, 15
270+
},
271+
},
272+
})
273+
274+
// seq number 13 was never sent, so it can't be resent
275+
for _, seqNum := range []uint16{11, 12, 15} {
276+
select {
277+
case p := <-stream.WrittenRTP():
278+
require.Equal(t, uint32(2), p.SSRC)
279+
require.Equal(t, uint8(2), p.PayloadType)
280+
require.Equal(t, binary.BigEndian.Uint16(p.Payload), seqNum)
281+
case <-time.After(10 * time.Millisecond):
282+
t.Fatal("written rtp packet not found")
283+
}
284+
}
285+
286+
select {
287+
case p := <-stream.WrittenRTP():
288+
t.Errorf("no more rtp packets expected, found sequence number: %v", p.SequenceNumber)
289+
case <-time.After(10 * time.Millisecond):
290+
}
291+
}

0 commit comments

Comments
 (0)