@@ -166,7 +166,8 @@ class DataQueueImpl final : public DataQueue,
166166 uint64_t capped_size;
167167 uint64_t size;
168168 if (capped_size_.To (&capped_size) && size_.To (&size)) {
169- return capped_size > size ? Just (capped_size - size) : Just<uint64_t >(0UL );
169+ return capped_size > size ? Just (capped_size - size)
170+ : Just<uint64_t >(0UL );
170171 }
171172 return Nothing<uint64_t >();
172173 }
@@ -175,7 +176,7 @@ class DataQueueImpl final : public DataQueue,
175176 tracker->TrackField (" entries" , entries_);
176177 }
177178
178- std::unique_ptr <Reader> getReader () override ;
179+ std::shared_ptr <Reader> getReader () override ;
179180 SET_MEMORY_INFO_NAME (DataQueue);
180181 SET_SELF_SIZE (DataQueueImpl);
181182
@@ -195,7 +196,9 @@ class DataQueueImpl final : public DataQueue,
195196// DataQueue with which it is associated, and always from the beginning.
196197// Reads are non-destructive, meaning that the state of the DataQueue
197198// will not and cannot be changed.
198- class IdempotentDataQueueReader final : public DataQueue::Reader {
199+ class IdempotentDataQueueReader final
200+ : public DataQueue::Reader,
201+ public std::enable_shared_from_this<DataQueue::Reader> {
199202 public:
200203 IdempotentDataQueueReader (std::shared_ptr<DataQueueImpl> data_queue)
201204 : data_queue_(std::move(data_queue)) {
@@ -212,8 +215,10 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
212215 int Pull (Next next,
213216 int options,
214217 DataQueue::Vec* data,
215- uint64_t count,
216- uint64_t max_count_hint = bob::kMaxCountHint ) override {
218+ size_t count,
219+ size_t max_count_hint = bob::kMaxCountHint ) override {
220+ std::shared_ptr<DataQueue::Reader> self = shared_from_this ();
221+
217222 // If ended is true, this reader has already reached the end and cannot
218223 // provide any more data.
219224 if (ended_) {
@@ -358,7 +363,7 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
358363 private:
359364 std::shared_ptr<DataQueueImpl> data_queue_;
360365 Maybe<uint32_t > current_index_ = Nothing<uint32_t >();
361- std::unique_ptr <DataQueue::Reader> current_reader_ = nullptr ;
366+ std::shared_ptr <DataQueue::Reader> current_reader_ = nullptr ;
362367 bool ended_ = false ;
363368 bool pull_pending_ = false ;
364369 int last_status_ = 0 ;
@@ -368,7 +373,9 @@ class IdempotentDataQueueReader final : public DataQueue::Reader {
368373// and removes those entries from the queue as they are fully consumed.
369374// This means that reads are destructive and the state of the DataQueue
370375// is mutated as the read proceeds.
371- class NonIdempotentDataQueueReader final : public DataQueue::Reader {
376+ class NonIdempotentDataQueueReader final
377+ : public DataQueue::Reader,
378+ public std::enable_shared_from_this<NonIdempotentDataQueueReader> {
372379 public:
373380 NonIdempotentDataQueueReader (std::shared_ptr<DataQueueImpl> data_queue)
374381 : data_queue_(std::move(data_queue)) {
@@ -386,8 +393,10 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
386393 int Pull (Next next,
387394 int options,
388395 DataQueue::Vec* data,
389- uint64_t count,
390- uint64_t max_count_hint = bob::kMaxCountHint ) override {
396+ size_t count,
397+ size_t max_count_hint = bob::kMaxCountHint ) override {
398+ std::shared_ptr<DataQueue::Reader> self = shared_from_this ();
399+
391400 // If ended is true, this reader has already reached the end and cannot
392401 // provide any more data.
393402 if (ended_) {
@@ -417,7 +426,8 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
417426 // still might get more data. We just don't know exactly when that'll
418427 // come, so let's return a blocked status.
419428 if (size < data_queue_->capped_size_ .FromJust ()) {
420- std::move (next)(bob::Status::STATUS_BLOCK, nullptr , 0 , [](uint64_t ) {});
429+ std::move (next)(
430+ bob::Status::STATUS_BLOCK, nullptr , 0 , [](uint64_t ) {});
421431 return bob::STATUS_BLOCK;
422432 }
423433
@@ -540,21 +550,21 @@ class NonIdempotentDataQueueReader final : public DataQueue::Reader {
540550
541551 private:
542552 std::shared_ptr<DataQueueImpl> data_queue_;
543- std::unique_ptr <DataQueue::Reader> current_reader_ = nullptr ;
553+ std::shared_ptr <DataQueue::Reader> current_reader_ = nullptr ;
544554 bool ended_ = false ;
545555 bool pull_pending_ = false ;
546556 int last_status_ = 0 ;
547557};
548558
549- std::unique_ptr <DataQueue::Reader> DataQueueImpl::getReader () {
559+ std::shared_ptr <DataQueue::Reader> DataQueueImpl::getReader () {
550560 if (isIdempotent ()) {
551- return std::make_unique <IdempotentDataQueueReader>(shared_from_this ());
561+ return std::make_shared <IdempotentDataQueueReader>(shared_from_this ());
552562 }
553563
554564 if (lockedToReader_) return nullptr ;
555565 lockedToReader_ = true ;
556566
557- return std::make_unique <NonIdempotentDataQueueReader>(shared_from_this ());
567+ return std::make_shared <NonIdempotentDataQueueReader>(shared_from_this ());
558568}
559569
560570// ============================================================================
@@ -567,8 +577,8 @@ class EmptyEntry final : public EntryBase {
567577 int Pull (Next next,
568578 int options,
569579 DataQueue::Vec* data,
570- uint64_t count,
571- uint64_t max_count_hint = bob::kMaxCountHint ) override {
580+ size_t count,
581+ size_t max_count_hint = bob::kMaxCountHint ) override {
572582 if (ended_) {
573583 std::move (next)(bob::Status::STATUS_EOS, nullptr , 0 , [](uint64_t ) {});
574584 return bob::Status::STATUS_EOS;
@@ -636,8 +646,8 @@ class InMemoryEntry final : public EntryBase {
636646 int Pull (Next next,
637647 int options,
638648 DataQueue::Vec* data,
639- uint64_t count,
640- uint64_t max_count_hint = bob::kMaxCountHint ) override {
649+ size_t count,
650+ size_t max_count_hint = bob::kMaxCountHint ) override {
641651 if (ended_) {
642652 std::move (next)(bob::Status::STATUS_EOS, nullptr , 0 , [](uint64_t ) {});
643653 return bob::Status::STATUS_EOS;
@@ -752,11 +762,11 @@ class DataQueueEntry : public EntryBase {
752762 DataQueueEntry& operator =(DataQueueEntry&&) = delete ;
753763
754764 std::unique_ptr<DataQueue::Reader> getReader () override {
755- return data_queue_->getReader ();
765+ return std::make_unique<ReaderImpl>( data_queue_->getReader () );
756766 }
757767
758- std::unique_ptr<Entry> slice (uint64_t start,
759- Maybe<uint64_t > end = Nothing<uint64_t >()) override {
768+ std::unique_ptr<Entry> slice (
769+ uint64_t start, Maybe<uint64_t > end = Nothing<uint64_t >()) override {
760770 std::shared_ptr<DataQueue> sliced = data_queue_->slice (start, end);
761771 if (!sliced) return nullptr ;
762772
@@ -790,6 +800,28 @@ class DataQueueEntry : public EntryBase {
790800
791801 private:
792802 std::shared_ptr<DataQueue> data_queue_;
803+
804+ class ReaderImpl : public DataQueue ::Reader {
805+ public:
806+ explicit ReaderImpl (std::shared_ptr<DataQueue::Reader> inner)
807+ : inner_(std::move(inner)) {}
808+
809+ int Pull (DataQueue::Reader::Next next,
810+ int options,
811+ DataQueue::Vec* data,
812+ size_t count,
813+ size_t max_count_hint) override {
814+ return inner_->Pull (
815+ std::move (next), options, data, count, max_count_hint);
816+ }
817+
818+ SET_NO_MEMORY_INFO ()
819+ SET_MEMORY_INFO_NAME (ReaderImpl)
820+ SET_SELF_SIZE (ReaderImpl)
821+
822+ private:
823+ std::shared_ptr<DataQueue::Reader> inner_;
824+ };
793825};
794826
795827// ============================================================================
@@ -851,8 +883,8 @@ class FdEntry final : public EntryBase {
851883 return std::make_unique<Reader>(this );
852884 }
853885
854- std::unique_ptr<Entry> slice (uint64_t start,
855- Maybe<uint64_t > end = Nothing<uint64_t >()) override {
886+ std::unique_ptr<Entry> slice (
887+ uint64_t start, Maybe<uint64_t > end = Nothing<uint64_t >()) override {
856888 uint64_t new_start = start_ + start;
857889 uint64_t new_end = end_;
858890 if (end.IsJust ()) {
@@ -918,10 +950,11 @@ class FdEntry final : public EntryBase {
918950 registry->Register (New);
919951 }
920952
921- static BaseObjectPtr<Wrap> Create (Environment* env,
922- int fd,
923- uint64_t start = 0 ,
924- Maybe<uint64_t > end = Nothing<uint64_t >()) {
953+ static BaseObjectPtr<Wrap> Create (
954+ Environment* env,
955+ int fd,
956+ uint64_t start = 0 ,
957+ Maybe<uint64_t > end = Nothing<uint64_t >()) {
925958 Local<Object> obj;
926959 if (!GetConstructorTemplate (env)
927960 ->InstanceTemplate ()
@@ -980,8 +1013,8 @@ class FdEntry final : public EntryBase {
9801013 int Pull (Next next,
9811014 int options,
9821015 DataQueue::Vec* data,
983- uint64_t count,
984- uint64_t max_count_hint = bob::kMaxCountHint ) override {
1016+ size_t count,
1017+ size_t max_count_hint = bob::kMaxCountHint ) override {
9851018 // TODO(@jasnell): For now, we're going to ignore data and count.
9861019 // Later, we can support these to allow the caller to allocate the
9871020 // buffers we read into. To keep things easier for now, we're going
0 commit comments