Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/Std/Internal/Async/Signal.lean
Original file line number Diff line number Diff line change
Expand Up @@ -238,23 +238,26 @@ def stop (s : Signal.Waiter) : IO Unit :=
s.native.stop

/--
Create a `Selector` that resolves once `s` has received the signal. Note that calling this function starts `s`
if it hasn't already started.
Create a `Selector` that resolves once `s` has received the signal. Note that calling this function
does not start the signal waiter.
-/
def selector (s : Signal.Waiter) : IO (Selector Unit) := do
let signalWaiter ← s.wait
return {
def selector (s : Signal.Waiter) : Selector Unit :=
{
tryFn := do
let signalWaiter : AsyncTask _ ← async s.wait
if ← IO.hasFinished signalWaiter then
return some ()
else
s.native.cancel
return none

registerFn waiter := do
let signalWaiter ← s.wait
discard <| AsyncTask.mapIO (x := signalWaiter) fun _ => do
let lose := return ()
let win promise := promise.resolve (.ok ())
waiter.race lose win

unregisterFn := s.stop
unregisterFn := s.native.cancel

}
52 changes: 48 additions & 4 deletions src/Std/Internal/Async/TCP.lean
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,44 @@ def accept (s : Server) : Async Client := do
|> Async.ofPromise
|>.map Client.ofNative

/--
Tries to accept an incoming connection.
-/
@[inline]
def tryAccept (s : Server) : IO (Option Client) := do
let res ← s.native.tryAccept
let socket ← IO.ofExcept res
return Client.ofNative <$> socket

/--
Creates a `Selector` that resolves once `s` has a connection available. Calling this function
does not start the connection wait, so it must not be called in parallel with `accept`.
-/
def acceptSelector (s : TCP.Socket.Server) : Selector Client :=
{
tryFn :=
s.tryAccept

registerFn waiter := do
let task ← s.native.accept

-- If we get cancelled the promise will be dropped so prepare for that
IO.chainTask (t := task.result?) fun res => do
match res with
| none => return ()
| some res =>
let lose := return ()
let win promise := do
try
let result ← IO.ofExcept res
promise.resolve (.ok (Client.ofNative result))
catch e =>
promise.resolve (.error e)
waiter.race lose win

unregisterFn := s.native.cancelAccept
}

/--
Gets the local address of the server socket.
-/
Expand Down Expand Up @@ -144,20 +182,25 @@ def recv? (s : Client) (size : UInt64) : Async (Option ByteArray) :=

/--
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
and provides that data. Calling this function starts the data wait, so it must not be called
and provides that data. Calling this function does not starts the data wait, so it must not be called
in parallel with `recv?`.
-/
def recvSelector (s : TCP.Socket.Client) (size : UInt64) : Async (Selector (Option ByteArray)) := do
let readableWaiter ← s.native.waitReadable
return {
def recvSelector (s : TCP.Socket.Client) (size : UInt64) : Selector (Option ByteArray) :=
{
tryFn := do
let readableWaiter ← s.native.waitReadable

if ← readableWaiter.isResolved then
-- We know that this read should not block
let res ← (s.recv? size).block
return some res
else
s.native.cancelRecv
return none

registerFn waiter := do
let readableWaiter ← s.native.waitReadable

-- If we get cancelled the promise will be dropped so prepare for that
discard <| IO.mapTask (t := readableWaiter.result?) fun res => do
match res with
Expand All @@ -173,6 +216,7 @@ def recvSelector (s : TCP.Socket.Client) (size : UInt64) : Async (Selector (Opti
catch e =>
promise.resolve (.error e)
waiter.race lose win

unregisterFn := s.native.cancelRecv
}

Expand Down
11 changes: 6 additions & 5 deletions src/Std/Internal/Async/Timer.lean
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,18 @@ def stop (s : Sleep) : IO Unit :=
s.native.stop

/--
Create a `Selector` that resolves once `s` has finished. Note that calling this function starts `s`
if it hasn't already started.
Create a `Selector` that resolves once `s` has finished. `s` only starts when it runs inside of a Selectable.
-/
def selector (s : Sleep) : Async (Selector Unit) := do
return {
def selector (s : Sleep) : Selector Unit :=
{
tryFn := do
let sleepWaiter ← s.native.next
if ← sleepWaiter.isResolved then
return some ()
else
s.native.cancel
return none

registerFn waiter := do
let sleepWaiter ← s.native.next
BaseIO.chainTask sleepWaiter.result? fun
Expand Down Expand Up @@ -107,7 +108,7 @@ Return a `Selector` that completes after `duration`.
-/
def Selector.sleep (duration : Std.Time.Millisecond.Offset) : Async (Selector Unit) := do
let sleeper ← Sleep.mk duration
sleeper.selector
return sleeper.selector

/--
`Interval` can be used to repeatedly wait for some duration like a clock.
Expand Down
15 changes: 10 additions & 5 deletions src/Std/Internal/Async/UDP.lean
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,24 @@ def recv (s : Socket) (size : UInt64) : Async (ByteArray × Option SocketAddress
Creates a `Selector` that resolves once `s` has data available, up to at most `size` bytes,
and provides that data. If the socket has not been previously bound with `bind`, it is
automatically bound to `0.0.0.0` (all interfaces) with a random port.
Calling this function starts the data wait, so it must not be called in parallel with `recv`.
Calling this function does starts the data wait, only when it's used with `Selectable.one` or `combine`.
It must not be called in parallel with `recv`.
-/
def recvSelector (s : Socket) (size : UInt64) :
Async (Selector (ByteArray × Option SocketAddress)) := do
let readableWaiter ← s.native.waitReadable
return {
def recvSelector (s : Socket) (size : UInt64) : Selector (ByteArray × Option SocketAddress) :=
{
tryFn := do
let readableWaiter ← s.native.waitReadable

if ← readableWaiter.isResolved then
-- We know that this read should not block
let res ← (s.recv size).block
return some res
else
s.native.cancelRecv
return none
registerFn waiter := do
let readableWaiter ← s.native.waitReadable

-- If we get cancelled the promise will be dropped so prepare for that
discard <| IO.mapTask (t := readableWaiter.result?) fun res => do
match res with
Expand All @@ -121,6 +125,7 @@ def recvSelector (s : Socket) (size : UInt64) :
catch e =>
promise.resolve (.error e)
waiter.race lose win

unregisterFn := s.native.cancelRecv
}

Expand Down
9 changes: 9 additions & 0 deletions src/Std/Internal/UV/Signal.lean
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ This function has different behavior depending on the state of the `Signal`:
@[extern "lean_uv_signal_stop"]
opaque stop (signal : @& Signal) : IO Unit

/--
This function has different behavior depending on the state of the `Signal`:
- If it is initial or finished this is a no-op.
- If it's running then it drops the accept promise and if it's not repeatable it sets
the signal handler to the initial state.
-/
@[extern "lean_uv_signal_cancel"]
opaque cancel (signal : @& Signal) : IO Unit

end Signal

end UV
Expand Down
12 changes: 12 additions & 0 deletions src/Std/Internal/UV/TCP.lean
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ Accepts an incoming connection on a listening TCP socket.
@[extern "lean_uv_tcp_accept"]
opaque accept (socket : @& Socket) : IO (IO.Promise (Except IO.Error Socket))

/--
Tries to accept an incoming connection on a listening TCP socket.
-/
@[extern "lean_uv_tcp_try_accept"]
opaque tryAccept (socket : @& Socket) : IO (Except IO.Error (Option Socket))

/--
Cancels the accept request of a socket.
-/
@[extern "lean_uv_tcp_cancel_accept"]
opaque cancelAccept (socket : @& Socket) : IO Unit

/--
Shuts down an incoming connection on a listening TCP socket.
-/
Expand Down
2 changes: 1 addition & 1 deletion src/Std/Internal/UV/Timer.lean
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ opaque stop (timer : @& Timer) : IO Unit
This function has different behavior depending on the state of the `Timer`:
- If it is initial or finished this is a no-op.
- If it is running, the promise generated by the `next` function is dropped.
- If `repeating` is `false` then it sets the timer to the finished state.
- If `repeating` is `false` then it sets the timer to the initial state.
-/
@[extern "lean_uv_timer_cancel"]
opaque cancel (timer : @& Timer) : IO Unit
Expand Down
78 changes: 58 additions & 20 deletions src/runtime/uv/signal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,29 @@ void initialize_libuv_signal() {
}

static bool signal_promise_is_finished(lean_uv_signal_object * signal) {
return lean_io_get_task_state_core((lean_object *)lean_to_promise(signal->m_promise)->m_result) == 2;
return signal->m_promise == NULL || lean_io_get_task_state_core((lean_object *)lean_to_promise(signal->m_promise)->m_result) == 2;
}

void handle_signal_event(uv_signal_t* handle, int signum) {
lean_object * obj = (lean_object*)handle->data;
lean_uv_signal_object * signal = lean_to_uv_signal(obj);

lean_assert(signal->m_state == SIGNAL_STATE_RUNNING);
lean_assert(signal->m_promise != NULL);

if (signal->m_repeating) {
if (!signal_promise_is_finished(signal)) {
lean_object* res = lean_io_promise_resolve(lean_box(signum), signal->m_promise, lean_io_mk_world());
lean_dec(res);
}
} else {
lean_assert(!signal_promise_is_finished(signal));
if (signal->m_promise != NULL) {
lean_object* res = lean_io_promise_resolve(lean_box(signum), signal->m_promise, lean_io_mk_world());
lean_dec(res);
}

uv_signal_stop(signal->m_uv_signal);
signal->m_state = SIGNAL_STATE_FINISHED;

lean_object* res = lean_io_promise_resolve(lean_box(signum), signal->m_promise, lean_io_mk_world());
lean_dec(res);

lean_dec(obj);
}
}
Expand Down Expand Up @@ -154,33 +154,37 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_next(b_obj_arg obj, obj_arg /
}
case SIGNAL_STATE_RUNNING:
{
lean_assert(signal->m_promise != NULL);
// 2 indicates finished
if (signal_promise_is_finished(signal)) {
lean_dec(signal->m_promise);
if (signal->m_promise != NULL) {
lean_dec(signal->m_promise);
}

signal->m_promise = create_promise();
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
} else {
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
}

lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
}
case SIGNAL_STATE_FINISHED:
{
lean_assert(signal->m_promise != NULL);
if (signal->m_promise == NULL) {
lean_object* finished_promise = create_promise();
return lean_io_result_mk_ok(finished_promise);
}

lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
}
}
} else {
if (signal->m_state == SIGNAL_STATE_INITIAL) {
return setup_signal();
} else {
lean_assert(signal->m_promise != NULL);

} else if (signal->m_promise != NULL) {
lean_inc(signal->m_promise);
return lean_io_result_mk_ok(signal->m_promise);
} else {
lean_object* finished_promise = create_promise();
return lean_io_result_mk_ok(finished_promise);
}
}
}
Expand All @@ -190,12 +194,15 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg obj, obj_arg /
lean_uv_signal_object * signal = lean_to_uv_signal(obj);

if (signal->m_state == SIGNAL_STATE_RUNNING) {
lean_assert(signal->m_promise != NULL);

event_loop_lock(&global_ev);
int result = uv_signal_stop(signal->m_uv_signal);
event_loop_unlock(&global_ev);

if (signal->m_promise != NULL) {
lean_dec(signal->m_promise);
signal->m_promise = NULL;
}

signal->m_state = SIGNAL_STATE_FINISHED;

// The loop does not need to keep the signal alive anymore.
Expand All @@ -211,6 +218,30 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg obj, obj_arg /
}
}

/* Std.Internal.UV.Signal.cancel (signal : @& Signal) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */) {
lean_uv_signal_object * signal = lean_to_uv_signal(obj);

// It's locking here to avoid changing the state during other operations.
event_loop_lock(&global_ev);

if (signal->m_state == SIGNAL_STATE_RUNNING && signal->m_promise != NULL) {
if (signal->m_repeating) {
lean_dec(signal->m_promise);
signal->m_promise = NULL;
} else {
uv_signal_stop(signal->m_uv_signal);
lean_dec(signal->m_promise);
signal->m_promise = NULL;
signal->m_state = SIGNAL_STATE_INITIAL;

lean_dec(obj);
}
}

event_loop_unlock(&global_ev);
return lean_io_result_mk_ok(lean_box(0));
}

#else

Expand All @@ -235,6 +266,13 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg signal, obj_ar
);
}

/* Std.Internal.UV.Signal.cancel (signal : @& Signal) : IO Unit */
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */) {
lean_always_assert(
false && ("Please build a version of Lean4 with libuv to invoke this.")
);
}

#endif

}
1 change: 1 addition & 0 deletions src/runtime/uv/signal.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ static inline lean_uv_signal_object* lean_to_uv_signal(lean_object * o) { return
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_mk(uint32_t signum_obj, uint8_t repeating, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_next(b_obj_arg signal, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg signal, obj_arg /* w */);
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */);

}
Loading
Loading