Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cpp/csp/python/cspimpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,16 @@ static PyObject *_set_capture_cpp_backtrace( PyObject *, PyObject *args )
CSP_RETURN_NONE;
}

static PyObject * _csp_in_realtime( PyObject*, PyObject * nodeptr )
{
CSP_BEGIN_METHOD;

csp::Node * node = reinterpret_cast<csp::Node *>( fromPython<uint64_t>( nodeptr ) );
return toPython( node -> rootEngine() -> inRealtime() );

CSP_RETURN_NULL;
}


static PyMethodDef _cspimpl_methods[] = {
{"_csp_now", (PyCFunction) _csp_now, METH_O, "current engine time"},
Expand All @@ -125,6 +135,7 @@ static PyMethodDef _cspimpl_methods[] = {
{"create_traceback", (PyCFunction) _create_traceback, METH_VARARGS, "internal"},
{"_csp_engine_stats", (PyCFunction) _engine_stats, METH_O, "engine statistics"},
{"set_capture_cpp_backtrace", (PyCFunction) _set_capture_cpp_backtrace, METH_VARARGS, "internal"},
{"_csp_in_realtime", (PyCFunction) _csp_in_realtime, METH_O, "is running engine in realtime mode"},
{nullptr}
};

Expand Down
6 changes: 6 additions & 0 deletions csp/impl/builtin_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,12 @@ def remove_dynamic_key(basket: GenericTSTypes["T"].TS_TYPE, key: Any):
)


@csp_builtin
def in_realtime() -> bool:
"""Returns whether the running engine is in realtime or sim mode"""
raise RuntimeError("Unexpected use of csp.in_realtime, csp.in_realtime can only be used inside a node")


@csp_builtin
def engine_start_time() -> datetime:
"""Returns the engine run start time (can be used both in nodes and graphs)"""
Expand Down
10 changes: 10 additions & 0 deletions csp/impl/wiring/node_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ class NodeParser(BaseParser):
_CSP_ENGINE_STATS_FUNC = "_csp_engine_stats"

_CSP_STOP_ENGINE_FUNC = "_csp_stop_engine"
_CSP_IN_REALTIME_FUNC = "_csp_in_realtime"
_LOCAL_METHODS = {
_CSP_NOW_FUNC: _cspimpl._csp_now,
_CSP_ENGINE_START_TIME_FUNC: _cspimpl._csp_engine_start_time,
_CSP_ENGINE_END_TIME_FUNC: _cspimpl._csp_engine_end_time,
_CSP_STOP_ENGINE_FUNC: _cspimpl._csp_stop_engine,
_CSP_ENGINE_STATS_FUNC: _cspimpl._csp_engine_stats,
_CSP_IN_REALTIME_FUNC: _cspimpl._csp_in_realtime,
}

_SPECIAL_BLOCKS_METH = {"alarms", "state", "start", "stop", "outputs"}
Expand Down Expand Up @@ -586,6 +588,13 @@ def _parse_now(self, node):
func=ast.Name(id=self._CSP_NOW_FUNC, ctx=ast.Load()), args=[self._node_proxy_expr()], keywords=[]
)

def _parse_in_realtime(self, node):
if len(node.args) or len(node.keywords):
raise CspParseError("csp.in_realtime takes no arguments", node.lineno)
return ast.Call(
func=ast.Name(id=self._CSP_IN_REALTIME_FUNC, ctx=ast.Load()), args=[self._node_proxy_expr()], keywords=[]
)

def _parse_stop_engine(self, node):
args = [self._node_proxy_expr()] + node.args
return ast.Call(func=ast.Name(id=self._CSP_STOP_ENGINE_FUNC, ctx=ast.Load()), args=args, keywords=node.keywords)
Expand Down Expand Up @@ -888,6 +897,7 @@ def _init_internal_maps(cls):
"csp.make_passive": cls._make_single_proxy_arg_func_resolver(builtin_functions.make_passive),
"csp.make_active": cls._make_single_proxy_arg_func_resolver(builtin_functions.make_active),
"csp.remove_dynamic_key": cls._parse_remove_dynamic_key,
"csp.in_realtime": cls._parse_in_realtime,
# omit this as its handled in a special case
# 'csp.alarm': cls._parse_alarm,
"csp.schedule_alarm": cls._parse_schedule_alarm,
Expand Down
70 changes: 70 additions & 0 deletions csp/tests/test_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import numpy as np
import psutil
import pytest
from pytz import UTC

import csp
from csp import PushMode, ts
Expand Down Expand Up @@ -415,6 +416,75 @@ def times(x: ts[bool]) -> ts[datetime]:
result = csp.run(times(x), starttime=datetime(2020, 2, 7, 9), endtime=timedelta(seconds=10))[0]
self.assertEqual([v[0] for v in result], [v[1] for v in result])

def test_csp_in_realtime__historical(self):
@csp.node
def is_in_realtime(x: ts[bool]) -> ts[bool]:
if csp.ticked(x):
return csp.in_realtime()

@csp.graph
def g() -> ts[bool]:
# Symbolic curve of expected booleans
times = csp.curve(
bool,
[(datetime(2025, 12, 24), False)],
)
csp.add_graph_output("in_realtime", is_in_realtime(times))

outputs = csp.run(g, starttime=datetime(2025, 12, 24), endtime=datetime(2025, 12, 25), realtime=False)
assert outputs
assert outputs["in_realtime"][0][1] is False

def test_csp_in_realtime__realtime(self):
def utc_now() -> datetime:
return datetime.now(UTC)

@csp.node
def is_in_realtime(x: ts[bool]) -> ts[bool]:
if csp.ticked(x):
return csp.in_realtime()

@csp.graph
def g() -> ts[bool]:
# Symbolic curve of expected booleans
times = csp.curve(
bool,
[(timedelta(seconds=10), True)],
)
csp.add_graph_output("in_realtime", is_in_realtime(times))
csp.stop_engine(times)

outputs = csp.run(g, starttime=utc_now(), endtime=utc_now() + timedelta(seconds=30), realtime=True)
assert outputs
assert outputs["in_realtime"][0][1] is True

@pytest.mark.skipif(not os.environ.get("CSP_ENGINE_FLAKY_TESTS"), reason="potentially flaky test")
def test_csp_in_realtime(self):
def utc_now() -> datetime:
return datetime.now(UTC)

@csp.node
def is_in_realtime(x: ts[bool]) -> ts[bool]:
if csp.ticked(x):
assert csp.in_realtime() == x
return csp.in_realtime()

@csp.graph
def g() -> ts[bool]:
# Symbolic curve of expected booleans
times = csp.curve(
bool,
[(utc_now() - timedelta(seconds=3), False), (utc_now() + timedelta(seconds=3), True)],
)
csp.add_graph_output("in_realtime", is_in_realtime(times))

outputs = csp.run(
g, starttime=utc_now() - timedelta(seconds=5), endtime=utc_now() + timedelta(seconds=10), realtime=True
)
assert outputs
assert outputs["in_realtime"][0][1] is False
assert outputs["in_realtime"][1][1] is True

def test_stop_engine(self):
@csp.node
def stop(x: ts[bool]) -> ts[bool]:
Expand Down
Loading