Skip to content

Commit a47e6c0

Browse files
ForeverASilvergavv
authored andcommitted
Rebase ring buffer branch to develop
1 parent f8dab55 commit a47e6c0

File tree

8 files changed

+141
-1
lines changed

8 files changed

+141
-1
lines changed

src/internal_modules/roc_pipeline/config.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,21 @@ void ReceiverCommonConfig::deduce_defaults() {
4646

4747
ReceiverSessionConfig::ReceiverSessionConfig()
4848
: payload_type(0)
49+
, prebuf_len(0)
4950
, enable_beeping(false) {
5051
}
5152

5253
void ReceiverSessionConfig::deduce_defaults() {
54+
if (prebuf_len == 0) {
55+
prebuf_len = latency.target_latency;
56+
}
5357
latency.deduce_defaults(DefaultLatency, true);
5458
watchdog.deduce_defaults(latency.target_latency);
5559
resampler.deduce_defaults(latency.tuner_backend, latency.tuner_profile);
5660
}
5761

58-
ReceiverSourceConfig::ReceiverSourceConfig() {
62+
ReceiverSourceConfig::ReceiverSourceConfig()
63+
: max_session_packets(0) {
5964
}
6065

6166
void ReceiverSourceConfig::deduce_defaults() {

src/internal_modules/roc_pipeline/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,9 @@ struct ReceiverSessionConfig {
155155
//! Packet payload type.
156156
unsigned int payload_type;
157157

158+
//! Packet prebuffer length, nanoseconds.
159+
core::nanoseconds_t prebuf_len;
160+
158161
//! FEC reader parameters.
159162
fec::ReaderConfig fec_reader;
160163

@@ -191,6 +194,9 @@ struct ReceiverSourceConfig {
191194
//! Default parameters for a session.
192195
ReceiverSessionConfig session_defaults;
193196

197+
//! Maximum number of packets per session.
198+
size_t max_session_packets;
199+
194200
//! Initialize config.
195201
ReceiverSourceConfig();
196202

src/internal_modules/roc_pipeline/receiver_session_group.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
314314
}
315315

316316
if (sess) {
317+
enqueue_prebuf_packet_(packet);
317318
// Session found, route packet to it.
318319
return sess->route_packet(packet);
319320
}
@@ -327,6 +328,48 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
327328
return status::StatusOK;
328329
}
329330

331+
void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet_ptr) {
332+
prebuf_packets_.push_back(*packet_ptr.get());
333+
334+
core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
335+
336+
while (prebuf_packets_.size() > 0) {
337+
core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp;
338+
if (now - received > receiver_config_.default_session.prebuf_len) {
339+
prebuf_packets_.remove(*prebuf_packets_.front());
340+
} else {
341+
break;
342+
}
343+
}
344+
}
345+
346+
void ReceiverSessionGroup::dequeue_prebuf_packets_(ReceiverSession& sess) {
347+
packet::PacketPtr curr, next;
348+
349+
if (prebuf_packets_.size() == 0) {
350+
return;
351+
}
352+
353+
core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
354+
355+
for (curr = prebuf_packets_.front(); curr; curr = next) {
356+
next = prebuf_packets_.nextof(*curr);
357+
358+
// if packet is too old, remove it from the queue
359+
core::nanoseconds_t received = curr->udp()->receive_timestamp;
360+
if (now - received > receiver_config_.default_session.prebuf_len) {
361+
prebuf_packets_.remove(*curr);
362+
continue;
363+
}
364+
365+
// if session handles the packet, remove it from the queue
366+
const status::StatusCode code = sess.route(curr);
367+
if (code == status::StatusOK) {
368+
prebuf_packets_.remove(*curr);
369+
}
370+
}
371+
}
372+
330373
status::StatusCode
331374
ReceiverSessionGroup::route_control_packet_(const packet::PacketPtr& packet,
332375
core::nanoseconds_t current_time) {
@@ -409,6 +452,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {
409452

410453
state_tracker_.add_active_sessions(+1);
411454

455+
dequeue_prebuf_packets_(*sess);
456+
412457
return status::StatusOK;
413458
}
414459

src/internal_modules/roc_pipeline/receiver_session_group.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
124124
status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
125125
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
126126
core::nanoseconds_t current_time);
127+
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
128+
void dequeue_prebuf_packets_(ReceiverSession& sess);
127129

128130
bool can_create_session_(const packet::PacketPtr& packet);
129131

@@ -153,6 +155,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
153155
core::List<ReceiverSession> sessions_;
154156
ReceiverSessionRouter session_router_;
155157

158+
core::List<packet::Packet> prebuf_packets_;
159+
156160
bool valid_;
157161
};
158162

src/public_api/include/roc/config.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -961,6 +961,14 @@ typedef struct roc_receiver_config {
961961
* If zero, default value is used. If negative, the check is disabled.
962962
*/
963963
long long choppy_playback_timeout;
964+
965+
/** Packet prebuffer length, in nanoseconds.
966+
* Packets received for sessions that have not yet been created
967+
* will be buffered. Any packets older than the prebuf_len
968+
* will be discarded.
969+
* If zero, default value is used.
970+
*/
971+
unsigned long long prebuf_len;
964972
} roc_receiver_config;
965973

966974
/** Interface configuration.

src/tests/roc_pipeline/test_receiver_source.cpp

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2108,6 +2108,62 @@ TEST(receiver_source, timestamp_mapping_remixing) {
21082108
CHECK(first_ts);
21092109
}
21102110

2111+
TEST(receiver_source, packet_buffer) {
2112+
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxPackets = 10 };
2113+
2114+
init(Rate, Chans, Rate, Chans);
2115+
2116+
ReceiverConfig config = make_config();
2117+
config.default_session.prebuf_len = 0;
2118+
ReceiverSource receiver(config, format_map, packet_factory, byte_buffer_factory,
2119+
sample_buffer_factory, arena);
2120+
CHECK(receiver.is_valid());
2121+
2122+
ReceiverSlot* slot = create_slot(receiver);
2123+
CHECK(slot);
2124+
2125+
packet::Queue queue;
2126+
packet::Queue source_queue;
2127+
packet::Queue repair_queue;
2128+
2129+
packet::IWriter* source_endpoint_writer =
2130+
create_endpoint(slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source);
2131+
CHECK(source_endpoint_writer);
2132+
2133+
packet::IWriter* repair_endpoint_writer =
2134+
create_endpoint(slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair);
2135+
CHECK(repair_endpoint_writer);
2136+
2137+
fec::WriterConfig fec_config;
2138+
2139+
test::PacketWriter packet_writer(
2140+
arena, queue, queue, format_map, packet_factory, byte_buffer_factory, src1, dst1,
2141+
dst2, PayloadType_Ch2, packet::FEC_ReedSolomon_M8, fec_config);
2142+
2143+
// setup reader
2144+
test::FrameReader frame_reader(receiver, sample_buffer_factory);
2145+
2146+
packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
2147+
output_sample_spec);
2148+
2149+
for (int i = 0; i < ManyPackets; ++i) {
2150+
packet::PacketPtr pp;
2151+
UNSIGNED_LONGS_EQUAL(status::StatusOK, queue.read(pp));
2152+
CHECK(pp);
2153+
2154+
if (pp->flags() & packet::Packet::FlagAudio) {
2155+
UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp));
2156+
}
2157+
if (pp->flags() & packet::Packet::FlagRepair) {
2158+
UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp));
2159+
}
2160+
}
2161+
2162+
receiver.refresh(frame_reader.refresh_ts());
2163+
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
2164+
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
2165+
}
2166+
21112167
// Check receiver metrics for multiple remote participants (senders).
21122168
TEST(receiver_source, metrics_participants) {
21132169
enum { Rate = SampleRate, Chans = Chans_Stereo, MaxParties = 10 };

src/tools/roc_recv/cmdline.ggo

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ section "Options"
4444
option "no-play-timeout" - "No playback timeout, TIME units"
4545
string optional
4646

47+
option "prebuf-len" - "Length of packet prebuffer, TIME units"
48+
string optional
49+
4750
option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
4851
string optional
4952

src/tools/roc_recv/main.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,19 @@ int main(int argc, char** argv) {
153153
}
154154
}
155155

156+
if (args.prebuf_len_given) {
157+
core::nanoseconds_t prebuf_len = 0;
158+
if (!core::parse_duration(args.prebuf_len_arg, prebuf_len)) {
159+
roc_log(LogError, "invalid --prebuf-len");
160+
return 1;
161+
}
162+
receiver_config.default_session.prebuf_len =
163+
(core::nanoseconds_t)args.prebuf_len_arg;
164+
} else {
165+
receiver_config.default_session.prebuf_len =
166+
receiver_config.default_session.target_latency;
167+
}
168+
156169
if (args.choppy_play_timeout_given) {
157170
if (!core::parse_duration(
158171
args.choppy_play_timeout_arg,

0 commit comments

Comments
 (0)