Skip to content

Commit d98a15c

Browse files
aggresssSean-Der
authored andcommitted
Add support for RFC 4588
Respect in RTX with distinict SSRC + PayloadType
1 parent 5ce4343 commit d98a15c

File tree

1 file changed

+50
-3
lines changed

1 file changed

+50
-3
lines changed

pkg/nack/responder_interceptor.go

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package nack
55

66
import (
7+
"encoding/binary"
78
"sync"
89

910
"github.com/pion/interceptor"
@@ -62,6 +63,11 @@ type ResponderInterceptor struct {
6263
type localStream struct {
6364
sendBuffer *sendBuffer
6465
rtpWriter interceptor.RTPWriter
66+
67+
// Non-zero if Retransmissions should be sent on a distinct stream
68+
rtxSsrc uint32
69+
rtxPayloadType uint8
70+
rtxSequencer rtp.Sequencer
6571
}
6672

6773
// NewResponderInterceptor returns a new ResponderInterceptorFactor
@@ -108,7 +114,13 @@ func (n *ResponderInterceptor) BindLocalStream(info *interceptor.StreamInfo, wri
108114
// error is already checked in NewGeneratorInterceptor
109115
sendBuffer, _ := newSendBuffer(n.size)
110116
n.streamsMu.Lock()
111-
n.streams[info.SSRC] = &localStream{sendBuffer: sendBuffer, rtpWriter: writer}
117+
n.streams[info.SSRC] = &localStream{
118+
sendBuffer: sendBuffer,
119+
rtpWriter: writer,
120+
rtxSsrc: info.SSRCRetransmission,
121+
rtxPayloadType: info.PayloadTypeRetransmission,
122+
rtxSequencer: rtp.NewRandomSequencer(),
123+
}
112124
n.streamsMu.Unlock()
113125

114126
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
@@ -139,8 +151,43 @@ func (n *ResponderInterceptor) resendPackets(nack *rtcp.TransportLayerNack) {
139151
for i := range nack.Nacks {
140152
nack.Nacks[i].Range(func(seq uint16) bool {
141153
if p := stream.sendBuffer.get(seq); p != nil {
142-
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
143-
n.log.Warnf("failed resending nacked packet: %+v", err)
154+
if stream.rtxSsrc != 0 {
155+
// Store the original sequence number and rewrite the sequence number.
156+
originalSequenceNumber := p.Header().SequenceNumber
157+
p.Header().SequenceNumber = stream.rtxSequencer.NextSequenceNumber()
158+
159+
// Rewrite the SSRC.
160+
p.Header().SSRC = stream.rtxSsrc
161+
// Rewrite the payload type.
162+
p.Header().PayloadType = stream.rtxPayloadType
163+
164+
// Remove padding if present.
165+
paddingLength := 0
166+
originPayload := p.Payload()
167+
if p.Header().Padding {
168+
paddingLength = int(originPayload[len(originPayload)-1])
169+
p.Header().Padding = false
170+
}
171+
172+
// Write the original sequence number at the beginning of the payload.
173+
payload := make([]byte, 2)
174+
binary.BigEndian.PutUint16(payload, originalSequenceNumber)
175+
payload = append(payload, originPayload[:len(originPayload)-paddingLength]...)
176+
177+
// Send RTX packet.
178+
if _, err := stream.rtpWriter.Write(p.Header(), payload, interceptor.Attributes{}); err != nil {
179+
n.log.Warnf("failed sending rtx packet: %+v", err)
180+
}
181+
182+
// Resore the Padding and SSRC.
183+
if paddingLength > 0 {
184+
p.Header().Padding = true
185+
}
186+
p.Header().SequenceNumber = originalSequenceNumber
187+
} else {
188+
if _, err := stream.rtpWriter.Write(p.Header(), p.Payload(), interceptor.Attributes{}); err != nil {
189+
n.log.Warnf("failed resending nacked packet: %+v", err)
190+
}
144191
}
145192
p.Release()
146193
}

0 commit comments

Comments
 (0)