Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ let rcl = {
node.init(nodeName, namespace);
debug('Finish initializing node, name = %s and namespace = %s.', nodeName, namespace);
node.handle = handle;
node.context = context;
this._nodes.push(node);
return node;
},
Expand Down
50 changes: 50 additions & 0 deletions lib/guard_condition.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2017 Intel Corporation. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

const rclnodejs = require('bindings')('rclnodejs');
const Entity = require('./entity.js');
const Context = require('./context.js');

/**
* @class - Class representing a guard condition in ROS
* @hideconstructor
*/

class GuardCondition extends Entity {
constructor(handle, callback) {
super(handle, null, null);

this._callback = callback;

// True when the executor sees this has been triggered but has not yet been handled
this._executorTriggered = false;
}

get callback() {
return this._callback;
}

static createGuardCondition(callback, context = Context.defaultContext()) {
let handle = rclnodejs.createGuardCondition(context.handle());
return new GuardCondition(handle, callback);
}

trigger() {
rclnodejs.triggerGuardCondition(this.handle);
}
}

module.exports = GuardCondition;
67 changes: 62 additions & 5 deletions lib/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const QoS = require('./qos.js');
const debug = require('debug')('rclnodejs:node');
const loader = require('./interface_loader.js');
const Context = require('./context.js');
const GuardCondition = require('./guard_condition.js');
/**
* @class - Class representing a Node in ROS
* @hideconstructor
Expand All @@ -36,6 +37,7 @@ class Node {
this._clients = [];
this._services = [];
this._timers = [];
this._guards = [];
this._name = name;

if (namespace.length === 0) {
Expand All @@ -47,15 +49,32 @@ class Node {
this.spinning = false;
}

execute() {
this._timers.forEach((timer) => {
execute(handles) {
let timersReady = this._timers.filter((timer) => handles.indexOf(timer.handle) !== -1);
let guardsReady = this._guards.filter((guard) => handles.indexOf(guard.handle) !== -1);
let subscriptionsReady = this._subscriptions.filter((subscription) =>
handles.indexOf(subscription.handle) !== -1);
let clientsReady = this._clients.filter((client) => handles.indexOf(client.handle) !== -1);
let servicesReady = this._services.filter((service) => handles.indexOf(service.handle) !== -1);

// Retrigger a guard condition that was triggered but not handled
this._guards.forEach((guard) => {
if (guard._executorTriggered) {
guard.trigger();
}
});

// Mark all guards as triggered before processing any handlers since they're auto-taken
guardsReady.forEach((guard) => guard._executorTriggered = true);

timersReady.forEach((timer) => {
if (timer.isReady()) {
rclnodejs.callTimer(timer.handle);
timer.callback();
}
});

this._subscriptions.forEach((subscription) => {
subscriptionsReady.forEach((subscription) => {
let Message = subscription.typeClass;
let msg = new Message();
let success = rclnodejs.rclTake(subscription.handle, msg.toRawROS());
Expand All @@ -65,7 +84,14 @@ class Node {
Message.destoryRawROS(msg);
});

this._clients.forEach((client) => {
guardsReady.forEach((guard) => {
if (guard._executorTriggered) {
guard._executorTriggered = false;
guard.callback();
}
});

clientsReady.forEach((client) => {
let Response = client.typeClass.Response;
let response = new Response();
let success = rclnodejs.rclTakeResponse(client.handle, client.sequenceNumber, response.toRawROS());
Expand All @@ -75,7 +101,7 @@ class Node {
Response.destoryRawROS(response);
});

this._services.forEach((service) => {
servicesReady.forEach((service) => {
let Request = service.typeClass.Request;
let request = new Request();
let header = rclnodejs.rclTakeRequest(service.handle, this.handle, request.toRawROS());
Expand Down Expand Up @@ -313,6 +339,24 @@ class Node {
return service;
}

/**
* Create a guard condition.
* @param {Function} callback - The callback to be called when the guard condition is triggered.
* @return {GuardCondition} - An instance of GuardCondition.
*/
createGuardCondition(callback) {
if (typeof (callback) !== 'function') {
throw new TypeError('Invalid argument');
}

let guard = GuardCondition.createGuardCondition(callback, this.context);
debug('Finish creating guard condition');
this._guards.push(guard);
this.syncHandles();

return guard;
}

/**
* Destroy all resource allocated by this node, including
* <code>Timer</code>s/<code>Publisher</code>s/<code>Subscription</code>s
Expand All @@ -330,6 +374,7 @@ class Node {
this._subscriptions = [];
this._clients = [];
this._services = [];
this._guards = [];
}

/**
Expand Down Expand Up @@ -392,6 +437,18 @@ class Node {
this._destroyEntity(timer, this._timers);
}

/**
* Destroy a guard condition.
* @param {GuardCondition} guard - The guard condition to be destroyed.
* @return {undefined}
*/
destroyGuardCondition(guard) {
if (!(guard instanceof GuardCondition)) {
throw new TypeError('Invalid argument');
}
this._destroyEntity(guard, this._guards);
}

/* Get the name of the node.
* @return {string}
*/
Expand Down
10 changes: 7 additions & 3 deletions src/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void Executor::DoWork(uv_async_t* handle) {
rcl_reset_error();
g_exception_ptr = nullptr;
}
executor->delegate_->Execute();
executor->delegate_->Execute(
executor->handle_manager_->get_filtered_handles());
}
}

Expand Down Expand Up @@ -117,8 +118,8 @@ void Executor::Run(void* arg) {
continue;

if (rcl_wait_set_resize(&wait_set, handle_manager->subscription_count(),
// TODO(minggang): support guard conditions
1u, handle_manager->timer_count(),
handle_manager->guard_contition_count() + 1u,
handle_manager->timer_count(),
handle_manager->client_count(),
handle_manager->service_count(),
// TODO(minggang): support events.
Expand Down Expand Up @@ -147,6 +148,9 @@ void Executor::Run(void* arg) {
wait_set.guard_conditions[0]) {
executor->running_.store(false);
}

handle_manager->FilterHandles(&wait_set);

if (!uv_is_closing(
reinterpret_cast<uv_handle_t*>(executor->async_))) {
uv_async_send(executor->async_);
Expand Down
6 changes: 5 additions & 1 deletion src/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

#include <atomic>
#include <exception>
#include <vector>

#include "rcl_handle.hpp"

struct rcl_context_t;

Expand All @@ -30,7 +33,8 @@ class Executor {
public:
class Delegate {
public:
virtual void Execute() = 0;
virtual void Execute(
const std::vector<rclnodejs::RclHandle *> &handles) = 0;
virtual void CatchException(std::exception_ptr e_ptr) = 0;
};

Expand Down
75 changes: 65 additions & 10 deletions src/handle_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <vector>

#include "rcl_handle.hpp"
#include "spdlog/spdlog.h"

namespace rclnodejs {
Expand Down Expand Up @@ -47,45 +46,102 @@ void HandleManager::CollectHandles(const v8::Local<v8::Object> node) {
Nan::Get(node, Nan::New("_clients").ToLocalChecked());
Nan::MaybeLocal<v8::Value> services =
Nan::Get(node, Nan::New("_services").ToLocalChecked());
Nan::MaybeLocal<v8::Value> guard_conditions =
Nan::Get(node, Nan::New("_guards").ToLocalChecked());

CollectHandlesByType(timers.ToLocalChecked()->ToObject(), &timers_);
CollectHandlesByType(subscriptions.ToLocalChecked()->ToObject(),
&subscriptions_);
CollectHandlesByType(clients.ToLocalChecked()->ToObject(), &clients_);
CollectHandlesByType(services.ToLocalChecked()->ToObject(), &services_);
CollectHandlesByType(guard_conditions.ToLocalChecked()->ToObject(),
&guard_conditions_);
}

is_synchronizing_.store(false);
uv_sem_post(&sem_);

SPDLOG_DEBUG(
spdlog::get("rclnodejs"),
"Add {0:d} timers, {1:d} subscriptions, {2:d} clients, {3:d} services.",
timers_.size(), subscriptions_.size(), clients_.size(), services_.size());
"Add {0:d} timers, {1:d} subscriptions, {2:d} clients, " +
"{3:d} services, {4:d} guards.",
timers_.size(),
subscriptions_.size(),
clients_.size(),
services_.size(),
guard_conditions_.size());
}

bool HandleManager::AddHandlesToWaitSet(rcl_wait_set_t* wait_set) {
for (auto& timer : timers_) {
if (rcl_wait_set_add_timer(wait_set, timer, nullptr) != RCL_RET_OK)
rcl_timer_t* rcl_timer = reinterpret_cast<rcl_timer_t*>(timer->ptr());
if (rcl_wait_set_add_timer(wait_set, rcl_timer, nullptr) != RCL_RET_OK)
return false;
}
for (auto& subscription : subscriptions_) {
if (rcl_wait_set_add_subscription(wait_set, subscription, nullptr) !=
rcl_subscription_t* rcl_subscription =
reinterpret_cast<rcl_subscription_t*>(subscription->ptr());
if (rcl_wait_set_add_subscription(wait_set, rcl_subscription, nullptr) !=
RCL_RET_OK)
return false;
}
for (auto& client : clients_) {
if (rcl_wait_set_add_client(wait_set, client, nullptr) != RCL_RET_OK)
rcl_client_t* rcl_client = reinterpret_cast<rcl_client_t*>(client->ptr());
if (rcl_wait_set_add_client(wait_set, rcl_client, nullptr) != RCL_RET_OK)
return false;
}
for (auto& service : services_) {
if (rcl_wait_set_add_service(wait_set, service, nullptr) != RCL_RET_OK)
rcl_service_t* rcl_service =
reinterpret_cast<rcl_service_t*>(service->ptr());
if (rcl_wait_set_add_service(wait_set, rcl_service, nullptr) != RCL_RET_OK)
return false;
}
for (auto& guard_condition : guard_conditions_) {
rcl_guard_condition_t* rcl_guard_condition =
reinterpret_cast<rcl_guard_condition_t*>(guard_condition->ptr());
if (rcl_wait_set_add_guard_condition(wait_set, rcl_guard_condition, nullptr)
!= RCL_RET_OK)
return false;
}

return true;
}

#define FILTER_READY_ENTITIES(ENTITY_TYPE) \
size_t idx; \
size_t idx_max; \
idx_max = wait_set->size_of_ ## ENTITY_TYPE ## s; \
const rcl_ ## ENTITY_TYPE ## _t ** struct_ptr = wait_set->ENTITY_TYPE ## s; \
for (idx = 0; idx < idx_max; idx ++) { \
if (struct_ptr[idx]) { \
for (auto& ENTITY_TYPE : ENTITY_TYPE ## s_) { \
if (struct_ptr[idx] == ENTITY_TYPE->ptr()) { \
filtered_handles_.push_back(ENTITY_TYPE); \
} \
} \
} \
}

void HandleManager::FilterHandles(rcl_wait_set_t* wait_set) {
filtered_handles_.clear();

{
FILTER_READY_ENTITIES(subscription)
}
{
FILTER_READY_ENTITIES(client)
}
{
FILTER_READY_ENTITIES(service)
}
{
FILTER_READY_ENTITIES(timer)
}
{
FILTER_READY_ENTITIES(guard_condition)
}
}

void HandleManager::ClearHandles() {
timers_.clear();
clients_.clear();
Expand All @@ -94,10 +150,9 @@ void HandleManager::ClearHandles() {
guard_conditions_.clear();
}

template <typename T>
void HandleManager::CollectHandlesByType(
const v8::Local<v8::Object>& typeObject,
std::vector<const T*>* vec) {
std::vector<rclnodejs::RclHandle*>* vec) {
Nan::HandleScope scope;

if (typeObject->IsArray()) {
Expand All @@ -112,7 +167,7 @@ void HandleManager::CollectHandlesByType(
rclnodejs::RclHandle* rcl_handle =
rclnodejs::RclHandle::Unwrap<rclnodejs::RclHandle>(
handle.ToLocalChecked()->ToObject());
vec->push_back(reinterpret_cast<T*>(rcl_handle->ptr()));
vec->push_back(rcl_handle);
}
}
}
Expand Down
Loading