Skip to content

Commit 5b52717

Browse files
committed
Merge 'Expell net::packet from output_stream API stack' from Pavel Emelyanov
The existing data_sink_impl API has three put() virtual overloads * put(net::packet) -- pure virtual one * put(temporary_buffer) -- implementation creates net::packet out of the buffer * put(vector<temporary_buffer>) -- similarly, the implementation converts the vector to packet Also there's fallback_put(net::packet) method for those implementations that don't want to mess with packet and prefer to convert the packet back to buffers. This API presumably was driven by the output_stream zero-copy buffers extension, that maintains net::packet on the output stream itself that accumulates written buffers and then put()-s them into sink. And the fallback_put() appeared later to facilitate sink implementations. Maintaining net::packet as zero-copy buffers on output_stream is, in turn, makes the stream work in two (and a half) modes -- the users can either buffers data, then flush, or append zero-copy buffers, then, again, flush. There's also a semi-mixed mode, where zero-copy buffers may come after a bunch of buffered writes. Using mixed mode should still happen with care -- after zero-copy writes and flush(), starting buffered writes can step on assertion if the stream is batch-flushed. Also the need to implement put(net::packet) overload is pretty harsh requirement, sinks that are not network sockets plug this implementation with abort() and require callers not to perform zero-copy writes into such streams. This PR eliminates the net::packet from the output_stream+data_sink layer and leaves it on socket sink implementations only. For that both, output_stream and data_sink are changed. First, the data_sink_impl. The new API (backward incompatible and thus under new API level) has just one put() that accepts std::span<temporary_buffer>. It's pure virtual method, implementations must grab the buffers before returning (even if the returned future is unresolved). The data_sink() has the put(span) overload as well as put(temporary_buffer) and put(vector<temporary_buffer>) ones, for convenience. File sink benefit from that change by just dropping the plugged put(packet) overload and that's mostly it. To submit buffers from span it picks them one-by-one, but later it should be tuned to submit iovec request. Network sinks immediatly convert span of buffers into net::packet, no other changes are made. HTTP chunked encoding sink converts each buffer from span into a "chunk", no other changes from current implementation is made. When we have mixed-mode (not this PR) it will be able to relax it to zero-copy buffers. The content-length sink zero-copy forwards the span to lower output_stream with the help of ... (see next paragraph) The output_stream change is -- the _zc_bufs member is changed from net::packet to be std::vector<temporary_buffer> and zero-copy write()-s accepting packet and scattered_message are removed in the new API level (could be deprecated, but it would delay full expelling of packet from streams API). The write of single temporary buffer is preserved. A new write(span<temporary_buffer>) overload is added. After the PR the existing _buf for buffered writes and _zc_bufs with zero-copied buffers still co-exist on output_stream and the old non-mixed behavior is preserved. However, this opens a way to implement the fully-mixed mode itself 🤞 eventually. Closes #2937 * github.com:scylladb/seastar: code: Introduce new API level iostream: Remove write()-s of packet/scattered_message from new API level iostream: Convert output_stream::_zc_bufs to vector of buffers code: Add data_sink_impl::put(std::span<temporary_buffer>) method code: Prepare some data_sink_impl::do_put(temporary_buffer) methods iostream: Introduce output_stream::write(span<temporary_buffer>) overload packet: Add packet(std::span<temporary_buffer>) constructor temporary_buffer: Add detach_front() helper
2 parents 8549271 + cdd7d7c commit 5b52717

26 files changed

+428
-49
lines changed

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,10 @@ option (Seastar_DEPRECATED_OSTREAM_FORMATTERS
7070
ON)
7171

7272
set (Seastar_API_LEVEL
73-
"8"
73+
"9"
7474
CACHE
7575
STRING
76-
"Seastar compatibility API level (7=unified CPU/IO scheduling groups, 8=noncopyable function in json_return_type")
76+
"Seastar compatibility API level (7=unified CPU/IO scheduling groups, 8=noncopyable function in json_return_type, 9=new sink API")
7777

7878
set_property (CACHE Seastar_API_LEVEL
7979
PROPERTY

apps/memcached/memcache.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -917,7 +917,8 @@ class ascii_protocol {
917917
scattered_message<char> msg;
918918
this_type::append_item<WithVersion>(msg, std::move(item));
919919
msg.append_static(msg_end);
920-
return out.write(std::move(msg));
920+
std::vector<temporary_buffer<char>> bufs = std::move(msg).release().release();
921+
return out.write(std::span<temporary_buffer<char>>(bufs));
921922
});
922923
} else {
923924
_items.clear();
@@ -931,7 +932,8 @@ class ascii_protocol {
931932
append_item<WithVersion>(msg, std::move(item));
932933
}
933934
msg.append_static(msg_end);
934-
return out.write(std::move(msg));
935+
std::vector<temporary_buffer<char>> bufs = std::move(msg).release().release();
936+
return out.write(std::span<temporary_buffer<char>>(bufs));
935937
});
936938
}
937939
}

configure.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def standard_supported(standard, compiler='g++'):
7474
help='Extra flags for the linker')
7575
arg_parser.add_argument('--optflags', action='store', dest='user_optflags', default='',
7676
help='Extra optimization flags for the release mode')
77-
arg_parser.add_argument('--api-level', action='store', dest='api_level', default='8',
77+
arg_parser.add_argument('--api-level', action='store', dest='api_level', default='9',
7878
help='Compatibility API level (8=latest)')
7979
arg_parser.add_argument('--compiler', action='store', dest='cxx', default='g++',
8080
help='C++ compiler path')

doc/compatibility.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ versions of the API. For example.
7676
"while at it" file_impl API is forced to accept io_intent argument
7777
- Seastar_API_LEVEL=8 changes json_return_type to hold a noncopyable function
7878
and become a move-only type
79+
- Seastar_API_LEVEL=9 defines the data_sink_impl::put(span<temporary_buffer>)
80+
as the new and only method to be implemented
7981

8082
Applications can use an old API_LEVEL during a transition
8183
period, fix their code, and move to the new API_LEVEL.
@@ -117,6 +119,7 @@ API Level History
117119
| 6 | 2020-09 | 2023-03 | future<T> instead of future<T...> |
118120
| 7 | 2023-05 | 2024-09 | unified CPU/IO scheduling groups |
119121
| 8 | 2025-08 | | noncopyable function in json_return_type |
122+
| 9 | 2025-08 | | data_sink_impl new API |
120123

121124

122125
Note: The "mandatory" column indicates when backwards compatibility

include/seastar/core/internal/api-level.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
// For IDEs that don't see SEASTAR_API_LEVEL, generate a nice default
2525
#ifndef SEASTAR_API_LEVEL
26-
#define SEASTAR_API_LEVEL 8
26+
#define SEASTAR_API_LEVEL 9
2727
#endif
2828

2929
#if SEASTAR_API_LEVEL == 8

include/seastar/core/iostream-impl.hh

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#pragma once
2525

26+
#include <numeric>
2627
#include <seastar/core/coroutine.hh>
2728
#include <seastar/core/do_with.hh>
2829
#include <seastar/core/loop.hh>
@@ -72,62 +73,59 @@ future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) no
7273

7374
template<typename CharType>
7475
future<>
75-
output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
76+
output_stream<CharType>::zero_copy_put(std::vector<temporary_buffer<CharType>> b) noexcept {
7677
// if flush is scheduled, disable it, so it will not try to write in parallel
7778
_flush = false;
7879
if (_flushing) {
7980
// flush in progress, wait for it to end before continuing
80-
return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
81-
return _fd.put(std::move(p));
81+
return _in_batch.value().get_future().then([this, b = std::move(b)] () mutable {
82+
return _fd.put(std::move(b));
8283
});
8384
} else {
84-
return _fd.put(std::move(p));
85+
return _fd.put(std::move(b));
8586
}
8687
}
8788

8889
// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
8990
template <typename CharType>
9091
future<>
91-
output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
92-
return repeat([this, p = std::move(p)] () mutable {
93-
if (p.len() < _size) {
94-
if (p.len()) {
95-
_zc_bufs = std::move(p);
96-
} else {
97-
_zc_bufs = net::packet::make_null_packet();
98-
}
92+
output_stream<CharType>::zero_copy_split_and_put(std::vector<temporary_buffer<CharType>> b, size_t len) noexcept {
93+
return repeat([this, b = std::move(b), len] () mutable {
94+
if (len < _size) {
95+
_zc_bufs = std::move(b);
96+
_zc_len = len;
9997
return make_ready_future<stop_iteration>(stop_iteration::yes);
10098
}
101-
auto chunk = p.share(0, _size);
102-
p.trim_front(_size);
99+
auto chunk = internal::detach_front(b, _size);
100+
len -= _size;
103101
return zero_copy_put(std::move(chunk)).then([] {
104102
return stop_iteration::no;
105103
});
106104
});
107105
}
108106

109107
template<typename CharType>
110-
future<> output_stream<CharType>::write(net::packet p) noexcept {
108+
future<> output_stream<CharType>::write(std::span<temporary_buffer<CharType>> bufs) noexcept {
111109
static_assert(std::is_same_v<CharType, char>, "packet works on char");
112110
try {
113-
if (p.len() != 0) {
111+
size_t size = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [] (size_t s, const auto& b) { return s + b.size(); });
112+
if (size != 0) {
114113
if (_end) {
114+
SEASTAR_ASSERT(_zc_bufs.empty());
115115
_buf.trim(_end);
116+
_zc_len = _end;
116117
_end = 0;
117-
SEASTAR_ASSERT(!_zc_bufs);
118-
_zc_bufs = net::packet(std::move(_buf));
119-
}
120-
121-
if (_zc_bufs) {
122-
_zc_bufs.append(std::move(p));
123-
} else {
124-
_zc_bufs = std::move(p);
118+
_zc_bufs.reserve(bufs.size() + 1);
119+
_zc_bufs.emplace_back(std::move(_buf));
125120
}
126121

127-
if (_zc_bufs.len() >= _size) {
122+
_zc_len += size;
123+
_zc_bufs.insert(_zc_bufs.end(), std::make_move_iterator(bufs.begin()), std::make_move_iterator(bufs.end()));
124+
if (_zc_len >= _size) {
128125
if (_trim_to_size) {
129-
return zero_copy_split_and_put(std::move(_zc_bufs));
126+
return zero_copy_split_and_put(std::move(_zc_bufs), std::exchange(_zc_len, 0));
130127
} else {
128+
_zc_len = 0;
131129
return zero_copy_put(std::move(_zc_bufs));
132130
}
133131
}
@@ -141,16 +139,28 @@ future<> output_stream<CharType>::write(net::packet p) noexcept {
141139
template<typename CharType>
142140
future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
143141
try {
144-
return write(net::packet(std::move(p)));
142+
return write(std::span<temporary_buffer<CharType>>(&p, 1));
145143
} catch (...) {
146144
return current_exception_as_future();
147145
}
148146
}
149147

148+
#if SEASTAR_API_LEVEL < 9
149+
template<typename CharType>
150+
future<> output_stream<CharType>::write(net::packet p) noexcept {
151+
try {
152+
std::vector<temporary_buffer<CharType>> bufs = std::move(p).release();
153+
return write(std::span<temporary_buffer<CharType>>(bufs));
154+
} catch (...) {
155+
return current_exception_as_future();
156+
}
157+
}
158+
150159
template<typename CharType>
151160
future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
152161
return write(std::move(msg).release());
153162
}
163+
#endif
154164

155165
template <typename CharType>
156166
future<temporary_buffer<CharType>>
@@ -360,7 +370,7 @@ template <typename CharType>
360370
future<>
361371
output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
362372
try {
363-
SEASTAR_ASSERT(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
373+
SEASTAR_ASSERT(_zc_bufs.empty() && "Mixing buffered writes and zero-copy writes not supported yet");
364374
auto bulk_threshold = _end ? (2 * _size - _end) : _size;
365375
if (n >= bulk_threshold) {
366376
if (_end) {
@@ -422,7 +432,8 @@ future<> output_stream<CharType>::do_flush() noexcept {
422432
return _fd.put(std::move(_buf)).then([this] {
423433
return _fd.flush();
424434
});
425-
} else if (_zc_bufs) {
435+
} else if (!_zc_bufs.empty()) {
436+
_zc_len = 0;
426437
return _fd.put(std::move(_zc_bufs)).then([this] {
427438
return _fd.flush();
428439
});
@@ -515,7 +526,7 @@ output_stream<CharType>::close() noexcept {
515526
template <typename CharType>
516527
data_sink
517528
output_stream<CharType>::detach() && {
518-
if (_buf || _zc_bufs) {
529+
if (_buf || !_zc_bufs.empty()) {
519530
throw std::logic_error("detach() called on a used output_stream");
520531
}
521532

include/seastar/core/iostream.hh

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <seastar/util/modules.hh>
4545
#ifndef SEASTAR_MODULE
4646
#include <boost/intrusive/slist.hpp>
47+
#include <ranges>
4748
#include <algorithm>
4849
#include <memory>
4950
#include <optional>
@@ -112,6 +113,12 @@ public:
112113
virtual temporary_buffer<char> allocate_buffer(size_t size) {
113114
return temporary_buffer<char>(size);
114115
}
116+
#if SEASTAR_API_LEVEL >= 9
117+
// The caller assumes that the storage that backs this span can be released
118+
// once this method returns, so implementations should move the buffers into
119+
// stable storage on their own early, before the returned future resolves.
120+
virtual future<> put(std::span<temporary_buffer<char>> data) = 0;
121+
#else
115122
virtual future<> put(net::packet data) = 0;
116123
virtual future<> put(std::vector<temporary_buffer<char>> data) {
117124
net::packet p;
@@ -124,6 +131,7 @@ public:
124131
virtual future<> put(temporary_buffer<char> buf) {
125132
return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
126133
}
134+
#endif
127135
virtual future<> flush() {
128136
return make_ready_future<>();
129137
}
@@ -151,6 +159,26 @@ public:
151159
}
152160

153161
protected:
162+
#if SEASTAR_API_LEVEL >= 9
163+
// A helper function that class that inhrerit from data_sink_impl
164+
// can use to create a future chain holding buffers from the span
165+
// to sequentially put them with the help of fn function
166+
template <typename Fn>
167+
requires std::is_invocable_r_v<future<>, Fn, temporary_buffer<char>&&>
168+
static future<> fallback_put(std::span<temporary_buffer<char>> bufs, Fn fn) {
169+
if (bufs.size() == 1) [[likely]] {
170+
return fn(std::move(bufs.front()));
171+
}
172+
173+
auto f = fn(std::move(bufs.front()));
174+
for (auto&& buf : bufs.subspan(1)) {
175+
f = std::move(f).then([fn, buf = std::move(buf)] () mutable {
176+
return fn(std::move(buf));
177+
});
178+
}
179+
return f;
180+
}
181+
#else
154182
// This is a helper function that classes that inherit from data_sink_impl
155183
// can use to implement the put overload for net::packet.
156184
// Unfortunately, we currently cannot define this function as
@@ -163,6 +191,7 @@ protected:
163191
co_await this->put(std::move(buf));
164192
}
165193
}
194+
#endif
166195
};
167196

168197
class data_sink {
@@ -175,6 +204,21 @@ public:
175204
temporary_buffer<char> allocate_buffer(size_t size) {
176205
return _dsi->allocate_buffer(size);
177206
}
207+
#if SEASTAR_API_LEVEL >= 9
208+
future<> put(std::span<temporary_buffer<char>> data) noexcept {
209+
try {
210+
return _dsi->put(data);
211+
} catch (...) {
212+
return current_exception_as_future();
213+
}
214+
}
215+
future<> put(std::vector<temporary_buffer<char>> data) noexcept {
216+
return put(std::span<temporary_buffer<char>>(data));
217+
}
218+
future<> put(temporary_buffer<char> data) noexcept {
219+
return put(std::span<temporary_buffer<char>>(&data, 1));
220+
}
221+
#else
178222
future<> put(std::vector<temporary_buffer<char>> data) noexcept {
179223
try {
180224
return _dsi->put(std::move(data));
@@ -196,6 +240,7 @@ public:
196240
return current_exception_as_future();
197241
}
198242
}
243+
#endif
199244
future<> flush() noexcept {
200245
try {
201246
return _dsi->flush();
@@ -420,9 +465,10 @@ class output_stream final {
420465
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
421466
data_sink _fd;
422467
temporary_buffer<CharType> _buf;
423-
net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
468+
std::vector<temporary_buffer<CharType>> _zc_bufs; // zero copy buffers
424469
size_t _size = 0;
425470
size_t _end = 0;
471+
size_t _zc_len = 0;
426472
bool _trim_to_size = false;
427473
bool _batch_flushes = false;
428474
std::optional<promise<>> _in_batch;
@@ -436,8 +482,8 @@ private:
436482
future<> put(temporary_buffer<CharType> buf) noexcept;
437483
void poll_flush() noexcept;
438484
future<> do_flush() noexcept;
439-
future<> zero_copy_put(net::packet p) noexcept;
440-
future<> zero_copy_split_and_put(net::packet p) noexcept;
485+
future<> zero_copy_put(std::vector<temporary_buffer<CharType>> b) noexcept;
486+
future<> zero_copy_split_and_put(std::vector<temporary_buffer<CharType>> b, size_t len) noexcept;
441487
[[gnu::noinline]]
442488
future<> slow_write(const CharType* buf, size_t n) noexcept;
443489
public:
@@ -453,7 +499,7 @@ public:
453499
if (_batch_flushes) {
454500
SEASTAR_ASSERT(!_in_batch && "Was this stream properly closed?");
455501
} else {
456-
SEASTAR_ASSERT(!_end && !_zc_bufs && "Was this stream properly closed?");
502+
SEASTAR_ASSERT(!_end && !_zc_len && "Was this stream properly closed?");
457503
}
458504
}
459505
/// Writes n bytes from the memory pointed by buf into the buffer
@@ -466,12 +512,17 @@ public:
466512
/// Writes the given string into the buffer
467513
future<> write(const std::basic_string<char_type>& s) noexcept;
468514

515+
#if SEASTAR_API_LEVEL < 9
469516
/// Appends the packet as zero-copy buffer
470517
future<> write(net::packet p) noexcept;
471518
/// Appends the scattered message as zero-copy buffer
472519
future<> write(scattered_message<char_type> msg) noexcept;
520+
#endif
521+
473522
/// Appends the temporary buffer as zero-copy buffer
474523
future<> write(temporary_buffer<char_type>) noexcept;
524+
/// Appends a bunch of buffers as zero-copy
525+
future<> write(std::span<temporary_buffer<char_type>>) noexcept;
475526

476527
future<> flush() noexcept;
477528

0 commit comments

Comments
 (0)