Skip to content

Check failed: (vals->size()) == (total_val) in KVWorker<Val>::Pull_ #100

@SmartAir

Description

@SmartAir

Hello, I store some push requests from a worker in a list and do not let the server handle these push requests until a certain requirement is satisfied (i.e. not handle the present pull request immediately, block it until a time I set). However, I met an error as below:

[03:19:47] /home/xiongzi/mxnet/dmlc-core/include/dmlc/logging.h:235: [03:19:47] /home/xiongzi/mxnet/ps-lite/include/ps/kv_app.h:579: Check failed: (vals->size()) == (total_val)
terminate called after throwing an instance of 'dmlc::Error'
what(): [03:19:47] /home/xiongzi/mxnet/ps-lite/include/ps/kv_app.h:579: Check failed: (vals->size()) == (total_val)

I feel puzzled about the code that raises the error in int KVWorker<Val>::Pull_:
CHECK_EQ(vals->size(), total_val);

Could someone please explain this sentence of code for me?
vals->size() refers to the the size of values in the present pull request, and the total_val refers to the total size of all values of the pull request's timestamp? (Plz point out my mistake if I says something wrong)
So what is the purpose of checking whether vals->size() and total_val are equal to each other? And what may cause the error I mention above?
Thanks a lot!

For your convenience, if you need more relevant code, the following is the function that raises the error in ps-lite/include/ps/kv_app.h:

template <typename Val>
template <typename C, typename D>
int KVWorker<Val>::Pull_(
    const SArray<Key>& keys, C* vals, D* lens, int cmd, const Callback& cb) {
  int ts = obj_->NewRequest(kServerGroup);
  AddCallback(ts, [this, ts, keys, vals, lens, cb]() mutable {
      mu_.lock();
      auto& kvs = recv_kvs_[ts];
      mu_.unlock();

      // do check
      size_t total_key = 0, total_val = 0;
      for (const auto& s : kvs) {
        Range range = FindRange(keys, s.keys.front(), s.keys.back()+1);
        CHECK_EQ(range.size(), s.keys.size())
            << "unmatched keys size from one server";
        if (lens) CHECK_EQ(s.lens.size(), s.keys.size());
        total_key += s.keys.size();
        total_val += s.vals.size();
      }
      CHECK_EQ(total_key, keys.size()) << "lost some servers?";

      // fill vals and lens
      std::sort(kvs.begin(), kvs.end(), [](
          const KVPairs<Val>& a, const KVPairs<Val>& b) {
                  return a.keys.front() < b.keys.front();
        });
      CHECK_NOTNULL(vals);
      if (vals->empty()) {
        vals->resize(total_val);
      } else {
        CHECK_EQ(vals->size(), total_val);
      }
      Val* p_vals = vals->data();
      int *p_lens = nullptr;
      if (lens) {
        if (lens->empty()) {
          lens->resize(keys.size());
        } else {
          CHECK_EQ(lens->size(), keys.size());
        }
        p_lens = lens->data();
      }
      for (const auto& s : kvs) {
        memcpy(p_vals, s.vals.data(), s.vals.size() * sizeof(Val));
        p_vals += s.vals.size();
        if (p_lens) {
          memcpy(p_lens, s.lens.data(), s.lens.size() * sizeof(int));
          p_lens += s.lens.size();
        }
      }

      mu_.lock();
      recv_kvs_.erase(ts);
      mu_.unlock();
      if (cb) cb();
    });

  KVPairs<Val> kvs; kvs.keys = keys;
  Send(ts, false, cmd, kvs);
  return ts;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions