Skip to content

Commit 1122d74

Browse files
Pick changes onto develop branch (did not touch logic)
1 parent b2798ab commit 1122d74

File tree

8 files changed

+150
-2
lines changed

8 files changed

+150
-2
lines changed

src/internal_modules/roc_pipeline/config.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ bool ReceiverCommonConfig::deduce_defaults(audio::ProcessorMap& processor_map) {
6666
// ReceiverSessionConfig
6767

6868
ReceiverSessionConfig::ReceiverSessionConfig()
69-
: payload_type(0) {
69+
: payload_type(0)
70+
, prebuf_len(0) {
7071
}
7172

7273
bool ReceiverSessionConfig::deduce_defaults(audio::ProcessorMap& processor_map) {
@@ -95,12 +96,17 @@ bool ReceiverSessionConfig::deduce_defaults(audio::ProcessorMap& processor_map)
9596
return false;
9697
}
9798

99+
if(!prebuf_len) {
100+
prebuf_len = latency.target_latency;
101+
}
102+
98103
return true;
99104
}
100105

101106
// ReceiverSourceConfig
102107

103-
ReceiverSourceConfig::ReceiverSourceConfig() {
108+
ReceiverSourceConfig::ReceiverSourceConfig()
109+
: max_session_packets(0) {
104110
}
105111

106112
bool ReceiverSourceConfig::deduce_defaults(audio::ProcessorMap& processor_map) {

src/internal_modules/roc_pipeline/config.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@ struct ReceiverSessionConfig {
186186
//! Watchdog parameters.
187187
audio::WatchdogConfig watchdog;
188188

189+
//! Packet prebuffer length, nanoseconds.
190+
core::nanoseconds_t prebuf_len;
191+
189192
//! Initialize config.
190193
ReceiverSessionConfig();
191194

@@ -205,6 +208,9 @@ struct ReceiverSourceConfig {
205208
//! Default parameters for a session.
206209
ReceiverSessionConfig session_defaults;
207210

211+
//! Maximum number of packets per session.
212+
size_t max_session_packets;
213+
208214
//! Initialize config.
209215
ReceiverSourceConfig();
210216

src/internal_modules/roc_pipeline/receiver_session_group.cpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ ReceiverSessionGroup::route_transport_packet_(const packet::PacketPtr& packet) {
331331
}
332332

333333
if (sess) {
334+
enqueue_prebuf_packet_(packet);
334335
// Session found, route packet to it.
335336
return sess->route_packet(packet);
336337
}
@@ -434,6 +435,8 @@ ReceiverSessionGroup::create_session_(const packet::PacketPtr& packet) {
434435
sessions_.push_back(*sess);
435436
state_tracker_.register_session();
436437

438+
dequeue_prebuf_packet_(*sess);
439+
437440
return status::StatusOK;
438441
}
439442

@@ -478,5 +481,47 @@ ReceiverSessionGroup::make_session_config_(const packet::PacketPtr& packet) cons
478481
return config;
479482
}
480483

484+
void ReceiverSessionGroup::enqueue_prebuf_packet_(const packet::PacketPtr& packet) {
485+
prebuf_packets_.push_back(*packet.get());
486+
487+
core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
488+
489+
while (prebuf_packets_.size() > 0) {
490+
core::nanoseconds_t received = prebuf_packets_.front()->udp()->receive_timestamp;
491+
if (now - received > source_config_.session_defaults.prebuf_len) {
492+
prebuf_packets_.remove(*prebuf_packets_.front());
493+
} else {
494+
break;
495+
}
496+
}
497+
}
498+
499+
void ReceiverSessionGroup::dequeue_prebuf_packet_(ReceiverSession& sess) {
500+
packet::PacketPtr curr, next;
501+
502+
if (prebuf_packets_.size() == 0) {
503+
return;
504+
}
505+
506+
core::nanoseconds_t now = core::timestamp(core::ClockMonotonic);
507+
508+
for (curr = prebuf_packets_.front(); curr; curr = next) {
509+
next = prebuf_packets_.nextof(*curr);
510+
511+
// if packet is too old, remote it from the queue
512+
core::nanoseconds_t received = curr->udp()->receive_timestamp;
513+
if (now - received > source_config_.session_defaults.prebuf_len) {
514+
prebuf_packets_.remove(*curr);
515+
continue;
516+
}
517+
518+
// if session handles the packet, remove it from the queue
519+
const status::StatusCode code = sess.route_packet(curr);
520+
if (code == status::StatusOK) {
521+
prebuf_packets_.remove(*curr);
522+
}
523+
}
524+
}
525+
481526
} // namespace pipeline
482527
} // namespace roc

src/internal_modules/roc_pipeline/receiver_session_group.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "roc_core/list.h"
2020
#include "roc_core/noncopyable.h"
2121
#include "roc_dbgio/csv_dumper.h"
22+
#include "roc_packet/packet.h"
2223
#include "roc_packet/packet_factory.h"
2324
#include "roc_pipeline/metrics.h"
2425
#include "roc_pipeline/receiver_endpoint.h"
@@ -131,6 +132,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
131132
status::StatusCode route_transport_packet_(const packet::PacketPtr& packet);
132133
status::StatusCode route_control_packet_(const packet::PacketPtr& packet,
133134
core::nanoseconds_t current_time);
135+
void enqueue_prebuf_packet_(const packet::PacketPtr& packet);
136+
void dequeue_prebuf_packet_(ReceiverSession& sess);
134137

135138
bool can_create_session_(const packet::PacketPtr& packet);
136139

@@ -165,6 +168,8 @@ class ReceiverSessionGroup : public core::NonCopyable<>, private rtcp::IParticip
165168

166169
dbgio::CsvDumper* dumper_;
167170

171+
core::List<packet::Packet> prebuf_packets_;
172+
168173
status::StatusCode init_status_;
169174
};
170175

src/public_api/include/roc/config.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,6 +1090,14 @@ typedef struct roc_receiver_config {
10901090
* If zero, default value is used. If negative, the check is disabled.
10911091
*/
10921092
long long choppy_playback_timeout;
1093+
1094+
/** Packet prebuffer length, in nanoseconds.
1095+
* Packets received for sessions that have not yet been created
1096+
* will be buffered. Any packets older than the prebuf_len
1097+
* will be discarded.
1098+
* If zero, default value is used.
1099+
*/
1100+
unsigned long long prebuf_len;
10931101
} roc_receiver_config;
10941102

10951103
/** Interface configuration.

src/tests/roc_pipeline/test_receiver_source.cpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3399,6 +3399,69 @@ TEST(receiver_source, timestamp_mapping_remixing) {
33993399
CHECK(first_ts);
34003400
}
34013401

3402+
TEST(receiver_source, packet_buffer) {
3403+
enum {
3404+
OutputRate = 48000,
3405+
PacketRate = 44100,
3406+
OutputChans = Chans_Stereo,
3407+
PacketChans = Chans_Mono
3408+
};
3409+
3410+
const audio::PcmSubformat OutputFormat = Format_S16_Ne;
3411+
const audio::PcmSubformat PacketFormat = Format_S16_Be;
3412+
3413+
init_with_specs(OutputRate, OutputChans, OutputFormat, PacketRate, PacketChans,
3414+
PacketFormat);
3415+
3416+
ReceiverSource receiver(make_default_config(), processor_map, encoding_map,
3417+
packet_pool, packet_buffer_pool, frame_pool,
3418+
frame_buffer_pool, arena);
3419+
LONGS_EQUAL(status::StatusOK, receiver.init_status());
3420+
3421+
ReceiverSlot* slot = create_slot(receiver);
3422+
CHECK(slot);
3423+
3424+
packet::FifoQueue queue;
3425+
packet::FifoQueue source_queue;
3426+
packet::FifoQueue repair_queue;
3427+
3428+
packet::IWriter* source_endpoint_writer = create_transport_endpoint(
3429+
slot, address::Iface_AudioSource, address::Proto_RTP_RS8M_Source, dst_addr1);
3430+
3431+
packet::IWriter* repair_endpoint_writer = create_transport_endpoint(
3432+
slot, address::Iface_AudioRepair, address::Proto_RS8M_Repair, dst_addr2);
3433+
3434+
fec::BlockWriterConfig fec_config;
3435+
3436+
test::PacketWriter packet_writer(
3437+
arena, *source_endpoint_writer, *repair_endpoint_writer, encoding_map,
3438+
packet_factory, src_id1, src_addr1, dst_addr1, dst_addr2, PayloadType_Ch2,
3439+
packet::FEC_ReedSolomon_M8, fec_config);
3440+
3441+
// setup reader
3442+
test::FrameReader frame_reader(receiver, frame_factory);
3443+
3444+
packet_writer.write_packets(fec_config.n_source_packets, SamplesPerPacket,
3445+
output_sample_spec);
3446+
3447+
for (int i = 0; i < ManyPackets; ++i) {
3448+
packet::PacketPtr pp;
3449+
LONGS_EQUAL(status::StatusOK, queue.read(pp, packet::ModeFetch));
3450+
CHECK(pp);
3451+
3452+
if (pp->flags() & packet::Packet::FlagAudio) {
3453+
UNSIGNED_LONGS_EQUAL(status::StatusOK, source_queue.write(pp));
3454+
}
3455+
if (pp->flags() & packet::Packet::FlagRepair) {
3456+
UNSIGNED_LONGS_EQUAL(status::StatusOK, repair_queue.write(pp));
3457+
}
3458+
}
3459+
3460+
LONGS_EQUAL(status::StatusOK, receiver.refresh(frame_reader.refresh_ts(), NULL));
3461+
frame_reader.read_nonzero_samples(SamplesPerFrame, output_sample_spec);
3462+
UNSIGNED_LONGS_EQUAL(1, receiver.num_sessions());
3463+
}
3464+
34023465
// Set high jitter, wait until latency increases and stabilizes.
34033466
TEST(receiver_source, adaptive_latency_increase) {
34043467
const size_t stabilization_window = JitterMeterWindow * 5;

src/tools/roc_recv/cmdline.ggo

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ section "Timeout options"
7676

7777
option "no-play-timeout" - "No-playback timeout, TIME units"
7878
typestr="TIME" string optional
79+
option "prebuf-len" - "Length of packet prebuffer, TIME units"
80+
string optional
7981
option "choppy-play-timeout" - "Choppy playback timeout, TIME units"
8082
typestr="TIME" string optional
8183

src/tools/roc_recv/main.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,19 @@ bool build_receiver_config(const gengetopt_args_info& args,
313313
}
314314
}
315315

316+
if (args.prebuf_len_given) {
317+
if (!core::parse_duration(
318+
args.prebuf_len_arg,
319+
receiver_config.session_defaults.prebuf_len)) {
320+
roc_log(LogError, "invalid --prebuf-len: bad format");
321+
return false;
322+
}
323+
if (receiver_config.session_defaults.prebuf_len) {
324+
roc_log(LogError, "invalid --prebuf-len: should be > 0");
325+
return false;
326+
}
327+
}
328+
316329
if (args.choppy_play_timeout_given) {
317330
if (!core::parse_duration(
318331
args.choppy_play_timeout_arg,

0 commit comments

Comments
 (0)