@@ -154,7 +154,7 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
154
154
stats_.keys_total += db_slice_->DbSize (db_indx);
155
155
}
156
156
157
- const uint64_t kCyclesPerYield = base::CycleClock::Frequency () >> 16 ; // ~15usec.
157
+ const uint64_t kCyclesPerJiffy = base::CycleClock::Frequency () >> 16 ; // ~15usec.
158
158
159
159
for (DbIndex db_indx = 0 ; db_indx < db_array_.size (); ++db_indx) {
160
160
if (!cntx_->IsRunning ())
@@ -166,7 +166,6 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
166
166
PrimeTable* pt = &db_array_[db_indx]->prime ;
167
167
VLOG (1 ) << " Start traversing " << pt->size () << " items for index " << db_indx;
168
168
169
- uint64_t start_cycles = base::CycleClock::Now ();
170
169
do {
171
170
if (!cntx_->IsRunning ()) {
172
171
return ;
@@ -175,13 +174,14 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
175
174
PrimeTable::Cursor next = pt->TraverseBuckets (
176
175
cursor, [this , &db_indx](auto it) { return BucketSaveCb (db_indx, it); });
177
176
cursor = next;
178
- uint64_t now = base::CycleClock::Now ();
179
- if (now > start_cycles + kCyclesPerYield ) {
180
- ThisFiber::Yield ();
181
- start_cycles = base::CycleClock::Now ();
182
- }
183
177
184
- PushSerialized (false );
178
+ // If we do not flush the data, and have not preempted,
179
+ // we may need to yield to other fibers to avoid grabbind the CPU for too long.
180
+ if (!PushSerialized (false )) {
181
+ if (ThisFiber::GetRunningTimeCycles () > kCyclesPerJiffy ) {
182
+ ThisFiber::Yield ();
183
+ }
184
+ }
185
185
} while (cursor);
186
186
187
187
DVLOG (2 ) << " after loop " << ThisFiber::GetName ();
@@ -309,7 +309,6 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
309
309
310
310
size_t SliceSnapshot::FlushSerialized (SerializerBase::FlushState flush_state) {
311
311
io::StringFile sfile;
312
- uint64_t start = base::CycleClock::Now ();
313
312
error_code ec = serializer_->FlushToSink (&sfile, flush_state);
314
313
CHECK (!ec); // always succeeds
315
314
@@ -320,16 +319,8 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
320
319
uint64_t id = rec_id_++;
321
320
DVLOG (2 ) << " Pushing " << id;
322
321
323
- // FlushToSink can be quite slow for large values or due compression.
324
- // In this case we sleep to avoid starvation of other fibers.
325
- // Please note that we allocate the record id before we preempt in order to preserve ordering
326
- // guarantees.
327
- const uint64_t kSleepThreshold = base::CycleClock::Frequency () >> 15 ; // ~30 usec
328
- uint64_t elapsed = base::CycleClock::Now () - start;
329
- if (elapsed > kSleepThreshold ) {
330
- // Balance the load between fibers.
331
- ThisFiber::SleepFor (chrono::microseconds (200 ));
332
- }
322
+ const uint64_t kLongRunThresholdCycles = base::CycleClock::Frequency () >> 15 ; // ~30 usec
323
+ uint64_t running_cycles = ThisFiber::GetRunningTimeCycles ();
333
324
334
325
fb2::NoOpLock lk;
335
326
@@ -349,6 +340,14 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
349
340
350
341
VLOG (2 ) << " Pushed with Serialize() " << serialized;
351
342
343
+ // FlushToSink can be quite slow for large values or due compression.
344
+ // In this case we counter-balance CPU over-usage by forcing sleep.
345
+ // We measure running_cycles before the preemption points because they reset the counter.
346
+ if (running_cycles > kLongRunThresholdCycles ) {
347
+ // Balance the load between fibers.
348
+ ThisFiber::SleepFor (chrono::microseconds (200 ));
349
+ }
350
+
352
351
return serialized;
353
352
}
354
353
0 commit comments