Skip to content

Commit 96bd70b

Browse files
ForeverASilvergavv
authored andcommitted
Rebase ring buffer branch to develop
1 parent b98c7ab commit 96bd70b

File tree

9 files changed

+139
-0
lines changed

9 files changed

+139
-0
lines changed

src/internal_modules/roc_netio/target_libuv/roc_netio/udp_receiver_port.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ void UdpReceiverPort::recv_cb_(uv_udp_t* handle,
287287

288288
pp->udp()->src_addr = src_addr;
289289
pp->udp()->dst_addr = self.config_.bind_address;
290+
pp->udp()->receive_timestamp = core::timestamp(core::ClockUnix);
290291

291292
pp->set_data(core::Slice<uint8_t>(*bp, 0, (size_t)nread));
292293

src/internal_modules/roc_packet/target_libuv/roc_packet/udp.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ struct UDP {
3131

3232
//! Sender request state.
3333
uv_udp_send_t request;
34+
35+
//! Received Timestamp.
36+
core::nanoseconds_t receive_timestamp;
3437
};
3538

3639
} // namespace packet

src/internal_modules/roc_pipeline/config.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,9 @@ struct ReceiverSessionConfig {
159159
//! Target latency, nanoseconds.
160160
core::nanoseconds_t target_latency;
161161

162+
//! Packet prebuffer length, nanoseconds.
163+
core::nanoseconds_t prebuf_len;
164+
162165
//! Packet payload type.
163166
unsigned int payload_type;
164167

@@ -185,6 +188,7 @@ struct ReceiverSessionConfig {
185188

186189
ReceiverSessionConfig()
187190
: target_latency(DefaultLatency)
191+
, prebuf_len(DefaultLatency)
188192
, payload_type(0)
189193
, resampler_backend(audio::ResamplerBackend_Default)
190194
, resampler_profile(audio::ResamplerProfile_Medium) {
@@ -225,6 +229,9 @@ struct ReceiverCommonConfig {
225229
//! Insert weird beeps instead of silence on packet loss.
226230
bool enable_beeping;
227231

232+
//! Maximum number of packets per session.
233+
size_t max_session_packets;
234+
228235
ReceiverCommonConfig()
229236
: output_sample_spec(DefaultSampleSpec)
230237
, enable_timing(false)

src/internal_modules/roc_pipeline/receiver_session_group.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,13 +166,56 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
166166
}
167167

168168
if (!can_create_session_(packet)) {
169+
enqueue_prebuf_packet_(packet);
169170
// TODO(gh-183): return status
170171
return status::StatusOK;
171172
}
172173

173174
return create_session_(packet);
174175
}
175176

177+
void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) {
178+
prebuf_packets_.push_back(*packet_ptr.get());
179+
180+
core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
181+
182+
while (prebuf_packets_.size() > 0) {
183+
core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp;
184+
if (now - received > receiver_config_.default_session.prebuf_len) {
185+
prebuf_packets_.remove(*prebuf_packets_.front());
186+
} else {
187+
break;
188+
}
189+
}
190+
}
191+
192+
void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) {
193+
packet::PacketPtr curr, next;
194+
195+
if (prebuf_packets_.size() == 0) {
196+
return;
197+
}
198+
199+
core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
200+
201+
for (curr = prebuf_packets_.front(); curr; curr = next) {
202+
next = prebuf_packets_.nextof(*curr);
203+
204+
// if packet is too old, remove it from the queue
205+
core::nanoseconds_t received = curr->udp()->receive_timestamp;
206+
if (now - received > receiver_config_.default_session.prebuf_len) {
207+
prebuf_packets_.remove(*curr);
208+
continue;
209+
}
210+
211+
// if session handles the packet, remove it from the queue
212+
const status::StatusCode code = sess.route(curr);
213+
if (code == status::StatusOK) {
214+
prebuf_packets_.remove(*curr);
215+
}
216+
}
217+
}
218+
176219
status::StatusCode
177220
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet) {
178221
if (!rtcp_composer_) {
@@ -252,6 +295,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {
252295

253296
receiver_state_.add_sessions(+1);
254297

298+
dequeue_prebuf_packets_(*sess);
299+
255300
return status::StatusOK;
256301
}
257302

src/internal_modules/roc_pipeline/receiver_session_group.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver
8282

8383
status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
8484
status::StatusCode route_control_packet_(const packet::PacketPtr& packet);
85+
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
86+
void dequeue_prebuf_packets_(ReceiverSession& sess);
8587

8688
bool can_create_session_(const packet::PacketPtr& packet);
8789

@@ -108,6 +110,7 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IReceiver
108110
core::Optional<rtcp::Session> rtcp_session_;
109111

110112
core::List<ReceiverSession> sessions_;
113+
core::List<packet::Packet> prebuf_packets_;
111114
};
112115

113116
} // namespace pipeline

src/public_api/include/roc/config.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,14 @@ typedef struct roc_receiver_config {
781781
* If zero, default value is used. If negative, the timeout is disabled.
782782
*/
783783
long long choppy_playback_timeout;
784+
785+
/** Packet prebuffer length, in nanoseconds.
786+
* Packets received for sessions that have not yet been created
787+
* will be buffered. Any packets older than the prebuf_len
788+
* will be discarded.
789+
* If zero, default value is used.
790+
*/
791+
unsigned long long prebuf_len;
784792
} roc_receiver_config;
785793

786794
/** Interface configuration.

src/tests/roc_pipeline/test_receiver_source.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,6 +1792,62 @@ TEST(receiver_source, recv_timestamp_mapping_remixing) {
17921792
CHECK(first_ts);
17931793
}
17941794

1795+
TEST(receiver_source, packet_buffer) {
1796+
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };
1797+
1798+
init(Rate, Chans, Rate, Chans);
1799+
1800+
ReceiverConfig config = make_config();
1801+
config.default_session.prebuf_len = 0;
1802+
ReceiverSource receiver(config, format_map, packet_factory, byte_buffer_factory,
1803+
sample_buffer_factory, arena);
1804+
CHECK(receiver.is_valid());
1805+
1806+
ReceiverSlot* slot = create_slot(receiver);
1807+
CHECK(slot);
1808+
1809+
packet::Queue queue;
1810+
packet::Queue source_queue;
1811+
packet::Queue repair_queue;
1812+
1813+
packet::IWriter* source_endpoint_writer =
1814+
create_endpoint(slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source);
1815+
CHECK(source_endpoint_writer);
1816+
1817+
packet::IWriter* repair_endpoint_writer =
1818+
create_endpoint(slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair);
1819+
CHECK(repair_endpoint_writer);
1820+
1821+
fec::WriterConfig fec_config;
1822+
1823+
test::PacketWriter packet_writer(
1824+
arena, queue, queue, format_map, packet_factory, byte_buffer_factory, src1, dst1,
1825+
dst2, PayloadType_Ch2, packet::FEC_ReedSolomon_M8, fec_config);
1826+
1827+
// setup reader
1828+
test::FrameReader frame_reader(receiver, sample_buffer_factory);
1829+
1830+
packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
1831+
output_sample_spec);
1832+
1833+
for (int i = 0; i < ManyPackets; ++i) {
1834+
packet::PacketPtr pp;
1835+
UNSIGNED_LONGS_EQUAL(status::StatusOK, queue.read(pp));
1836+
CHECK(pp);
1837+
1838+
if (pp->flags() & packet::Packet::FlagAudio) {
1839+
UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp));
1840+
}
1841+
if (pp->flags() & packet::Packet::FlagRepair) {
1842+
UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp));
1843+
}
1844+
}
1845+
1846+
receiver.refresh(frame_reader.refresh_ts());
1847+
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
1848+
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
1849+
}
1850+
17951851
TEST(receiver_source, metrics_sessions) {
17961852
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxSess = 10 };
17971853

src/tools/roc_recv/cmdline.ggo

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ section "Options"
4141
option "no-play-timeout" - "No playback timeout, TIME units"
4242
string optional
4343

44+
option "prebuf-len" - "Length of packet prebuffer, TIME units"
45+
string optional
46+
4447
option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
4548
string optional
4649

src/tools/roc_recv/main.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,19 @@ int main(int argc, char** argv) {
135135
receiver_config.default_session.target_latency);
136136
}
137137

138+
if (args.prebuf_len_given) {
139+
core::nanoseconds_t prebuf_len = 0;
140+
if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) {
141+
roc_log(LogError, "invalid --prebuf-len");
142+
return 1;
143+
}
144+
receiver_config.default_session.prebuf_len =
145+
(core::nanoseconds_t)args.prebuf_len_arg;
146+
} else {
147+
receiver_config.default_session.prebuf_len =
148+
receiver_config.default_session.target_latency;
149+
}
150+
138151
if (args.choppy_play_timeout_given) {
139152
if (!core::parse_duration(
140153
args.choppy_play_timeout_arg,

0 commit comments

Comments
 (0)