Skip to content

Commit 43d8b61

Browse files
committed
Additional checked queue cleanups/improvements
1 parent 827b74f commit 43d8b61

File tree

7 files changed

+31
-23
lines changed

7 files changed

+31
-23
lines changed

src/workerd/api/streams/internal.c++

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2024,9 +2024,8 @@ void WritableStreamInternalController::drain(jsg::Lock& js, v8::Local<v8::Value>
20242024
}
20252025

20262026
void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) {
2027-
queue.forEach([&](const WriteEvent& event) {
2028-
WriteEvent& e = const_cast<WriteEvent&>(event);
2029-
KJ_SWITCH_ONEOF(e.event) {
2027+
queue.forEach([&](auto& event) {
2028+
KJ_SWITCH_ONEOF(event.event) {
20302029
KJ_CASE_ONEOF(write, Write) {
20312030
visitor.visit(write.promise);
20322031
}

src/workerd/api/streams/queue.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -835,14 +835,14 @@ class ByteQueue final {
835835

836836
JSG_MEMORY_INFO(ByteQueue::ByobRequest) {}
837837

838+
inline ReadRequest& getRequest() {
839+
return KJ_ASSERT_NONNULL(request->tryGet(), "The ByobRequest has been invalidated.");
840+
}
841+
838842
private:
839843
kj::Own<WeakRef<ReadRequest>> request;
840844
ConsumerImpl& consumer;
841845
QueueImpl& queue;
842-
843-
inline ReadRequest& getRequest() {
844-
return KJ_ASSERT_NONNULL(request->tryGet(), "The ByobRequest has been invalidated.");
845-
}
846846
};
847847

848848
struct State {

src/workerd/api/streams/standard.c++

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1556,11 +1556,7 @@ void WritableImpl<Self>::visitForGc(jsg::GcVisitor& visitor) {
15561556
KJ_IF_SOME(pendingAbort, maybePendingAbort) {
15571557
visitor.visit(*pendingAbort);
15581558
}
1559-
writeRequests.forEach([&](const WriteRequest& write) {
1560-
// const_cast is ugly here but GcVisitor only supports non-const references.
1561-
auto& w = const_cast<WriteRequest&>(write);
1562-
visitor.visit(w.resolver, w.value);
1563-
});
1559+
writeRequests.forEach([&](auto& write) { visitor.visit(write.resolver, write.value); });
15641560
}
15651561

15661562
template <typename Self>
@@ -3971,7 +3967,7 @@ void WritableImpl<Self>::jsgGetMemoryInfo(jsg::MemoryTracker& tracker) const {
39713967
tracker.trackField("writeAlgorithm", algorithms.write);
39723968
tracker.trackField("sizeAlgorithm", algorithms.size);
39733969

3974-
writeRequests.forEach([&](const auto& req) { tracker.trackField("writeRequest", req); });
3970+
writeRequests.forEach([&](auto& req) { tracker.trackField("writeRequest", req); });
39753971

39763972
tracker.trackField("inFlightWrite", inFlightWrite);
39773973
tracker.trackField("inFlightClose", inFlightClose);

src/workerd/jsg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ wd_cc_library(
4545
":jsg-core",
4646
":memory-tracker",
4747
"//src/workerd/util",
48+
"//src/workerd/util:checked-queue",
4849
"//src/workerd/util:sentry",
4950
"//src/workerd/util:thread-scopes",
5051
"@capnp-cpp//src/kj",

src/workerd/jsg/iterator.c++

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,19 @@
77
namespace workerd::jsg {
88

99
kj::Maybe<jsg::Promise<void>&> AsyncIteratorImpl::maybeCurrent() {
10-
if (!pendingStack.empty()) {
11-
return pendingStack.back();
12-
}
13-
return kj::none;
10+
return pendingStack.peekBack();
1411
}
1512

1613
void AsyncIteratorImpl::pushCurrent(Promise<void> promise) {
17-
pendingStack.push_back(kj::mv(promise));
14+
pendingStack.push(kj::mv(promise));
1815
}
1916

2017
void AsyncIteratorImpl::popCurrent() {
21-
if (!pendingStack.empty()) {
22-
pendingStack.pop_front();
23-
}
18+
auto dropped KJ_UNUSED = KJ_ASSERT_NONNULL(pendingStack.pop());
2419
}
2520

2621
void AsyncIteratorImpl::visitForGc(jsg::GcVisitor& visitor) {
27-
visitor.visitAll(pendingStack);
22+
pendingStack.forEach([&](auto& p) { visitor.visit(p); });
2823
}
2924

3025
} // namespace workerd::jsg

src/workerd/jsg/iterator.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <workerd/jsg/jsg.h>
88
#include <workerd/jsg/memory.h>
99
#include <workerd/jsg/struct.h>
10+
#include <workerd/util/checked-queue.h>
1011

1112
#include <concepts>
1213
#include <list>
@@ -800,7 +801,7 @@ class AsyncIteratorImpl {
800801
}
801802

802803
private:
803-
std::list<Promise<void>> pendingStack;
804+
workerd::util::Queue<Promise<void>> pendingStack;
804805
};
805806

806807
// Provides the base implementation of JSG_ASYNC_ITERATOR types. See the documentation

src/workerd/util/checked-queue.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,22 @@ class Queue final {
141141
return count;
142142
}
143143

144+
// Applies the callback to each element in the queue.
145+
// Returns the number of elements processed.
146+
// If the callback returns false, the iteration stops.
147+
inline size_t forEach(auto callback) {
148+
size_t count = 0;
149+
for (auto& item: inner) {
150+
count++;
151+
if constexpr (std::is_void_v<decltype(callback(item))>) {
152+
callback(item);
153+
} else {
154+
if (!callback(item)) break;
155+
}
156+
}
157+
return count;
158+
}
159+
144160
// Checks if the queue is empty.
145161
inline bool empty() const {
146162
return inner.empty();

0 commit comments

Comments
 (0)