Skip to content

Commit e6e0e7d

Browse files
Implement flush policy
Introduce a configurable FlushPolicy to control when the TLS layer flushes its underlying transport. This change adds a FlushPolicy enum (Relaxed or Strict) and wires it into TlsConnection and TlsWriter. The policy determines whether the transport’s flush() is called after writing a TLS record. With Strict (the default), the transport is always flushed, ensuring that data is acknowledged or committed before continuing. This mode is compatible with existing behavior. With Relaxed, the TLS layer closes the record and hands bytes to the transport without forcing a flush, allowing buffered writes to improve performance and reduce latency. This gives callers explicit control over how aggressively the transport flushes data. It's particularly important for transports like embassy-net TCP, where flush() blocks waiting for ACKs. The default remains Strict to preserve compatibility with embedded-tls 0.17.0, while the Relaxed mode can be selected together with larger socket window for improved throughput.
1 parent c5fa732 commit e6e0e7d

File tree

4 files changed

+142
-12
lines changed

4 files changed

+142
-12
lines changed

src/asynch.rs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::TlsError;
44
use crate::common::decrypted_buffer_info::DecryptedBufferInfo;
55
use crate::common::decrypted_read_handler::DecryptedReadHandler;
66
use crate::connection::{Handshake, State, decrypt_record};
7+
use crate::flush_policy::FlushPolicy;
78
use crate::key_schedule::KeySchedule;
89
use crate::key_schedule::{ReadKeySchedule, WriteKeySchedule};
910
use crate::read_buffer::ReadBuffer;
@@ -30,6 +31,7 @@ where
3031
record_reader: RecordReader<'a>,
3132
record_write_buf: WriteBuffer<'a>,
3233
decrypted: DecryptedBufferInfo,
34+
flush_policy: FlushPolicy,
3335
}
3436

3537
impl<'a, Socket, CipherSuite> TlsConnection<'a, Socket, CipherSuite>
@@ -63,9 +65,28 @@ where
6365
record_reader: RecordReader::new(record_read_buf),
6466
record_write_buf: WriteBuffer::new(record_write_buf),
6567
decrypted: DecryptedBufferInfo::default(),
68+
flush_policy: FlushPolicy::default(),
6669
}
6770
}
6871

72+
/// Returns a reference to the current flush policy.
73+
///
74+
/// The flush policy controls whether the underlying transport is flushed
75+
/// (via its `flush()` method) after writing a TLS record.
76+
#[inline]
77+
pub fn flush_policy(&self) -> FlushPolicy {
78+
self.flush_policy
79+
}
80+
81+
/// Replace the current flush policy with the provided one.
82+
///
83+
/// This sets how and when the connection will call `flush()` on the
84+
/// underlying transport after writing records.
85+
#[inline]
86+
pub fn set_flush_policy(&mut self, policy: FlushPolicy) {
87+
self.flush_policy = policy;
88+
}
89+
6990
/// Open a TLS connection, performing the handshake with the configuration provided when
7091
/// creating the connection instance.
7192
///
@@ -152,15 +173,22 @@ where
152173

153174
key_schedule.increment_counter();
154175

155-
self.delegate
156-
.flush()
157-
.await
158-
.map_err(|e| TlsError::Io(e.kind()))?;
176+
if self.flush_policy.flush_transport() {
177+
self.flush_transport().await?;
178+
}
159179
}
160180

161181
Ok(())
162182
}
163183

184+
#[inline]
185+
async fn flush_transport(&mut self) -> Result<(), TlsError> {
186+
self.delegate
187+
.flush()
188+
.await
189+
.map_err(|e| TlsError::Io(e.kind()))
190+
}
191+
164192
fn create_read_buffer(&mut self) -> ReadBuffer {
165193
self.decrypted.create_read_buffer(self.record_reader.buf)
166194
}
@@ -231,7 +259,7 @@ where
231259

232260
self.key_schedule.write_state().increment_counter();
233261

234-
self.flush().await
262+
self.flush_transport().await
235263
}
236264

237265
/// Close a connection instance, returning the ownership of the async I/O provider.
@@ -265,6 +293,7 @@ where
265293
delegate: self.delegate.clone(),
266294
key_schedule: wks,
267295
record_write_buf: self.record_write_buf.reborrow_mut(),
296+
flush_policy: self.flush_policy,
268297
};
269298

270299
(reader, writer)
@@ -391,6 +420,21 @@ where
391420
delegate: Socket,
392421
key_schedule: &'a mut WriteKeySchedule<CipherSuite>,
393422
record_write_buf: WriteBufferBorrowMut<'a>,
423+
flush_policy: FlushPolicy,
424+
}
425+
426+
impl<'a, Socket, CipherSuite> TlsWriter<'a, Socket, CipherSuite>
427+
where
428+
Socket: AsyncWrite + 'a,
429+
CipherSuite: TlsCipherSuite + 'static,
430+
{
431+
#[inline]
432+
async fn flush_transport(&mut self) -> Result<(), TlsError> {
433+
self.delegate
434+
.flush()
435+
.await
436+
.map_err(|e| TlsError::Io(e.kind()))
437+
}
394438
}
395439

396440
impl<Socket, CipherSuite> AsRef<Socket> for TlsWriter<'_, Socket, CipherSuite>
@@ -487,10 +531,9 @@ where
487531

488532
self.key_schedule.increment_counter();
489533

490-
self.delegate
491-
.flush()
492-
.await
493-
.map_err(|e| TlsError::Io(e.kind()))?;
534+
if self.flush_policy.flush_transport() {
535+
self.flush_transport().await?;
536+
}
494537
}
495538

496539
Ok(())

src/blocking.rs

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use core::sync::atomic::Ordering;
33
use crate::common::decrypted_buffer_info::DecryptedBufferInfo;
44
use crate::common::decrypted_read_handler::DecryptedReadHandler;
55
use crate::connection::{Handshake, State, decrypt_record};
6+
use crate::flush_policy::FlushPolicy;
67
use crate::key_schedule::KeySchedule;
78
use crate::key_schedule::{ReadKeySchedule, WriteKeySchedule};
89
use crate::read_buffer::ReadBuffer;
@@ -30,6 +31,7 @@ where
3031
record_reader: RecordReader<'a>,
3132
record_write_buf: WriteBuffer<'a>,
3233
decrypted: DecryptedBufferInfo,
34+
flush_policy: FlushPolicy,
3335
}
3436

3537
impl<'a, Socket, CipherSuite> TlsConnection<'a, Socket, CipherSuite>
@@ -64,9 +66,28 @@ where
6466
record_reader: RecordReader::new(record_read_buf),
6567
record_write_buf: WriteBuffer::new(record_write_buf),
6668
decrypted: DecryptedBufferInfo::default(),
69+
flush_policy: FlushPolicy::default(),
6770
}
6871
}
6972

73+
/// Returns a reference to the current flush policy.
74+
///
75+
/// The flush policy controls whether the underlying transport is flushed
76+
/// (via its `flush()` method) after writing a TLS record.
77+
#[inline]
78+
pub fn flush_policy(&self) -> FlushPolicy {
79+
self.flush_policy
80+
}
81+
82+
/// Replace the current flush policy with the provided one.
83+
///
84+
/// This sets how and when the connection will call `flush()` on the
85+
/// underlying transport after writing records.
86+
#[inline]
87+
pub fn set_flush_policy(&mut self, policy: FlushPolicy) {
88+
self.flush_policy = policy;
89+
}
90+
7091
/// Open a TLS connection, performing the handshake with the configuration provided when
7192
/// creating the connection instance.
7293
///
@@ -147,12 +168,21 @@ where
147168

148169
key_schedule.increment_counter();
149170

150-
self.delegate.flush().map_err(|e| TlsError::Io(e.kind()))?;
171+
if self.flush_policy.flush_transport() {
172+
self.flush_transport()?;
173+
}
151174
}
152175

153176
Ok(())
154177
}
155178

179+
#[inline]
180+
fn flush_transport(&mut self) -> Result<(), TlsError> {
181+
self.delegate
182+
.flush()
183+
.map_err(|e| TlsError::Io(e.kind()))
184+
}
185+
156186
fn create_read_buffer(&mut self) -> ReadBuffer {
157187
self.decrypted.create_read_buffer(self.record_reader.buf)
158188
}
@@ -219,7 +249,7 @@ where
219249

220250
self.key_schedule.write_state().increment_counter();
221251

222-
self.flush()?;
252+
self.flush_transport()?;
223253

224254
Ok(())
225255
}
@@ -255,6 +285,7 @@ where
255285
delegate: self.delegate.clone(),
256286
key_schedule: wks,
257287
record_write_buf: self.record_write_buf.reborrow_mut(),
288+
flush_policy: self.flush_policy,
258289
};
259290

260291
(reader, writer)
@@ -380,6 +411,19 @@ where
380411
delegate: Socket,
381412
key_schedule: &'a mut WriteKeySchedule<CipherSuite>,
382413
record_write_buf: WriteBufferBorrowMut<'a>,
414+
flush_policy: FlushPolicy,
415+
}
416+
417+
impl<'a, Socket, CipherSuite> TlsWriter<'a, Socket, CipherSuite>
418+
where
419+
Socket: Write + 'a,
420+
CipherSuite: TlsCipherSuite + 'static,
421+
{
422+
fn flush_transport(&mut self) -> Result<(), TlsError> {
423+
self.delegate
424+
.flush()
425+
.map_err(|e| TlsError::Io(e.kind()))
426+
}
383427
}
384428

385429
impl<Socket, CipherSuite> AsRef<Socket> for TlsWriter<'_, Socket, CipherSuite>
@@ -475,7 +519,10 @@ where
475519

476520
self.key_schedule.increment_counter();
477521

478-
self.delegate.flush().map_err(|e| TlsError::Io(e.kind()))?;
522+
if self.flush_policy.flush_transport() {
523+
self.flush_transport()?;
524+
}
525+
479526
}
480527

481528
Ok(())

src/flush_policy.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
//! Flush policy for TLS sockets.
2+
//!
3+
//! Two strategies are provided:
4+
//! - `Relaxed`: close the TLS encryption buffer and hand the data to the transport
5+
//! delegate without forcing a transport-level flush.
6+
//! - `Strict`: in addition to handing the data to the transport delegate, also
7+
//! request a flush of the transport. For TCP transports this typically means
8+
//! waiting for an ACK (e.g. on embassy TCP sockets) before considering the
9+
//! data fully flushed.
10+
11+
/// Policy controlling how TLS layer flushes encrypted data to the transport.
12+
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
13+
pub enum FlushPolicy {
14+
/// Close the TLS encryption buffer and pass bytes to the transport delegate.
15+
/// Do not force a transport-level flush or wait for an ACK.
16+
Relaxed,
17+
18+
/// In addition to passing bytes to the transport delegate, request a
19+
/// transport-level flush and wait for confirmation (ACK) before returning.
20+
Strict,
21+
}
22+
23+
impl FlushPolicy {
24+
/// Returns true when the transport delegate should be explicitly flushed.
25+
///
26+
/// Relaxed -> false, Strict -> true.
27+
pub fn flush_transport(&self) -> bool {
28+
matches!(self, Self::Strict)
29+
}
30+
}
31+
32+
impl Default for FlushPolicy {
33+
/// Default to `Strict` for compatibility with embedded-tls 0.17.0.
34+
fn default() -> Self {
35+
FlushPolicy::Strict
36+
}
37+
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ mod connection;
6363
mod content_types;
6464
mod crypto_engine;
6565
mod extensions;
66+
pub mod flush_policy;
6667
mod handshake;
6768
mod key_schedule;
6869
mod parse_buffer;
@@ -82,6 +83,8 @@ pub mod webpki;
8283
mod asynch;
8384
pub use asynch::*;
8485

86+
pub use flush_policy::*;
87+
8588
#[derive(Debug, Copy, Clone)]
8689
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
8790
pub enum TlsError {

0 commit comments

Comments
 (0)