Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <algorithm>
#include <functional>
#include <vector>

namespace facebook::react {

constexpr int DEFAULT_MAX_SIZE = 1024;

/**
* A container for storing entries of type T, with the following properties:
* - It can only grow up to a specified max size
* - It's a circular buffer (the oldest elements are dropped if reached max
* size and adding a new element)
* - The entries can be "consumed" (once), which from the point of view of
* the consumer effectively clears the buffer
* - Even after the entries are consumed, all of the non-overwritten entries
* can still be independently retrieved an arbitrary amount of times
*/
template <class T>
class BoundedConsumableBuffer {
public:
/**
* Status of the add/push operation for the `BoundedConsumableBuffer`
* container
*/
enum class PushStatus {
// There was free space in the buffer, element was successfully pushed:
OK = 0,

// Element was pushed, but had to overwrite some already consumed elements:
OVERWRITE = 1,

// Element wasn't pushed, as buffer size limit has been reached and it's
// not possible to overwrite already consumed elements anymore:
DROP = 2,
};

BoundedConsumableBuffer(int maxSize = DEFAULT_MAX_SIZE) : maxSize_(maxSize) {}

/**
* Adds (pushes) element into the buffer. Returns the result/status of the
* operation, which will depend on whether the buffer reached the max allowed
* size and how many are there unconsumed elements.
*/
PushStatus add(const T &&el) {
if (entries_.size() < maxSize_) {
Copy link
Contributor

@Pranav-yadav Pranav-yadav Mar 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This overall is an awesome implementation. 🙌
Before proceeding, I'm just here to learn.
I had a question though; specifically with this method (add);

  • What if the caller want's the value being overwritten (in that case), to be returned from add?

  • I.e. Is it possible and correct; that we could add some gracefulness when overwriting?

  • I know the *getNextOverwriteCandidate() is there for the same purpose, if explicitly called.

  • If I understood the code correctly, obviously there will be no way to retrieve the overwritten entry if the caller doesn't explicitly call the the method provided.

  • I get that it's up-to the caller to take care of that, as size and other sufficient methods are provided. But, "basically I thought is it possible to take care of that; implicitly in the add itself".

  • POC:

//...
std::tuple<PushStatus, T*> add(const T &&el){
    if (entries_.size() < maxSize_) {
      // Haven't reached max buffer size yet, just add and grow the buffer
      entries_.emplace_back(el);
      cursorEnd_++;
      numToConsume_++;
      return std::make_tuple(PushStatus::OK, nullptr);
    } else if (numToConsume_ == maxSize_) {
      // Drop the oldest (yet unconsumed) element in the buffer
      auto entryToBeOverwritten = entries_[position_];
      entries_[position_] = el;
      cursorEnd_ = (cursorEnd_ + 1) % maxSize_;
      position_ = (position_ + 1) % maxSize_;
      cursorStart_ = position_;
      return std::make_tuple(PushStatus::DROP, &entryToBeOverwritten);
    } else {
      // Overwrite the oldest (but already consumed) element in the buffer
      auto entryToBeOverwritten = entries_[position_];
      entries_[position_] = el;
      position_ = (position_ + 1) % entries_.size();
      cursorEnd_ = position_;
      numToConsume_++;
      return std::make_tuple(PushStatus::OVERWRITE, &entryToBeOverwritten);
    }
  }
//...
  • At caller's side:
//...
PushStatus pushStatus;
T* mayBeOverwrittenEntry;
std::tie(pushStatus, mayBeOverwrittenEntry) = bcf.add(newElement);
//...

This does not seem to have significant overhead, and I agree with that; it may affect DX :)

WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback!

Yes, it's a good suggestion.

I initially did it this way, in fact, but then decided against it, in order to keep the API simpler.

We only need to know which element we'll overwrite in some limited use cases (in particular, for the case of mark performance entries, which also additionally need a separate lookup-by-name registry).

So my idea was to kind of keep simple things simple, and complex ones possible, in this case :)

But your suggestion would definitely work great as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a good suggestion.

Thanks :)

I initially did it this way, in fact,

👍 This means I'm also on the right path of learning.

We only need to know which element we'll overwrite in some limited use cases (in particular, for the case of mark performance entries, which also additionally need a separate lookup-by-name registry).

Definitely. Btw, I wasn't aware of these specifics, thanks.

So my idea was to kind of keep simple things simple, and complex ones possible, in this case :)

For sure.

But your suggestion would definitely work great as well.

Again, thanks for validating my suggestions. I did learn something new here :)

// Haven't reached max buffer size yet, just add and grow the buffer
entries_.emplace_back(el);
cursorEnd_++;
numToConsume_++;
return PushStatus::OK;
} else if (numToConsume_ == maxSize_) {
// Drop the oldest (yet unconsumed) element in the buffer
entries_[position_] = el;
cursorEnd_ = (cursorEnd_ + 1) % maxSize_;
position_ = (position_ + 1) % maxSize_;
cursorStart_ = position_;
return PushStatus::DROP;
} else {
// Overwrite the oldest (but already consumed) element in the buffer
entries_[position_] = el;
position_ = (position_ + 1) % entries_.size();
cursorEnd_ = position_;
numToConsume_++;
return PushStatus::OVERWRITE;
}
}

/**
* Returns pointer to next entry which would be overwritten or dropped if
* added a new element. Null if no entry will be dropped.
*/
const T *getNextOverwriteCandidate() const {
if (entries_.size() < maxSize_) {
return nullptr;
} else {
return &entries_[position_];
}
}

T &operator[](size_t idx) {
return entries_[(position_ + idx) % entries_.size()];
}

/**
* Returns reference to the last unconsumed element
*/
T &back() {
return entries_[(cursorEnd_ - 1 + entries_.size()) % entries_.size()];
}

size_t size() const {
return entries_.size();
}

size_t getNumToConsume() const {
return numToConsume_;
}

void clear() {
entries_.clear();
position_ = 0;
cursorStart_ = 0;
cursorEnd_ = 0;
numToConsume_ = 0;
}

/**
* Clears buffer entries by predicate
*/
void clear(std::function<bool(const T &)> predicate) {
int pos = cursorStart_;
std::vector<T> entries;
int numToConsume = 0;
int i;
for (i = 0; i < numToConsume_; i++) {
if (!predicate(entries_[pos])) {
entries.push_back(entries_[pos]);
numToConsume++;
}
pos = (pos + 1) % entries_.size();
}
cursorEnd_ = entries.size();

for (; i < entries_.size(); i++) {
if (!predicate(entries_[pos])) {
entries.push_back(entries_[pos]);
}
pos = (pos + 1) % entries_.size();
}

numToConsume_ = numToConsume;
cursorStart_ = 0;
cursorEnd_ = numToConsume_;
position_ = numToConsume_;
entries.swap(entries_);
}

/**
* Retrieves buffer entries, whether consumed or not
*/
std::vector<T> getEntries() const {
std::vector<T> res;
getEntries(res);
return res;
}

/**
* Retrieves buffer entries, whether consumed or not, with predicate
*/
std::vector<T> getEntries(std::function<bool(const T &)> predicate) const {
std::vector<T> res;
getEntries(res, predicate);
return res;
}

void getEntries(std::vector<T> &res) const {
const size_t oldSize = res.size();
res.resize(oldSize + entries_.size());
std::copy(
entries_.begin() + position_, entries_.end(), res.begin() + oldSize);
std::copy(
entries_.begin(),
entries_.begin() + position_,
res.begin() + oldSize + entries_.size() - position_);
}

void getEntries(std::vector<T> &res, std::function<bool(const T &)> predicate)
const {
for (int i = 0; i < entries_.size(); i++) {
const T &el = entries_[(i + position_) % entries_.size()];
if (predicate(el)) {
res.push_back(el);
}
}
}

/**
* "Consumes" all the currently unconsumed entries in the buffer and returns
* these entries. Note that even if the buffer may not have unconsumed
* elements currently, it's still possible to retrieve all buffer elements
* via `getEntries`.
*/
std::vector<T> consume() {
std::vector<T> res;
consume(res);
return res;
}

void consume(std::vector<T> &res) {
if (numToConsume_ == 0) {
return;
}

const size_t resStart = res.size();
res.resize(res.size() + numToConsume_);
if (cursorEnd_ > cursorStart_) {
std::copy(
entries_.begin() + cursorStart_,
entries_.begin() + cursorEnd_,
res.begin() + resStart);
} else {
std::copy(
entries_.begin() + cursorStart_,
entries_.end(),
res.begin() + resStart);
std::copy(
entries_.begin(),
entries_.begin() + cursorEnd_,
res.begin() + resStart + static_cast<int>(entries_.size()) -
cursorStart_);
}

cursorStart_ = cursorEnd_;
numToConsume_ = 0;
}

private:
std::vector<T> entries_;

const int maxSize_;

// Current starting position in the circular buffer:
int position_{0};

// Current "cursor" - positions of the firsst and after last unconsumed
// element, relative to the starting position:
int cursorStart_{0};
int cursorEnd_{0};

// Number of currently unconsumed elements:
int numToConsume_{0};
};

} // namespace facebook::react
Loading