Skip to content

Commit 300ef1a

Browse files
committed
fix: a bunch of selectors
1 parent 1ab10f5 commit 300ef1a

File tree

8 files changed

+110
-6
lines changed

8 files changed

+110
-6
lines changed

src/Std/Internal/Async/Signal.lean

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,19 +242,21 @@ Create a `Selector` that resolves once `s` has received the signal. Note that ca
242242
if it hasn't already started.
243243
-/
244244
def selector (s : Signal.Waiter) : IO (Selector Unit) := do
245-
let signalWaiter ← s.wait
246245
return {
247246
tryFn := do
247+
let signalWaiter : AsyncTask _ ← async s.wait
248248
if ← IO.hasFinished signalWaiter then
249249
return some ()
250250
else
251251
return none
252252

253253
registerFn waiter := do
254+
let signalWaiter ← s.wait
254255
discard <| AsyncTask.mapIO (x := signalWaiter) fun _ => do
255256
let lose := return ()
256257
let win promise := promise.resolve (.ok ())
257258
waiter.race lose win
258259

259-
unregisterFn := s.stop
260+
unregisterFn := s.native.cancel
261+
260262
}

src/Std/Internal/Async/TCP.lean

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,40 @@ def accept (s : Server) : Async Client := do
7070
|> Async.ofPromise
7171
|>.map Client.ofNative
7272

73+
/--
74+
Tries to accepts an incoming connection.
75+
-/
76+
@[inline]
77+
def tryAccept (s : Server) : Async (Option Client) := do
78+
let res ← s.native.tryAccept
79+
let socket ← Async.ofExcept res
80+
return Client.ofNative <$> socket
81+
82+
/--
83+
Creates a `Selector` that resolves once `s` has a connetion available. Calling this function
84+
does not starts the connection wait, so it must not be called in parallel with `accept`.
85+
-/
86+
def acceptSelector (s : TCP.Socket.Server) : Async (Selector Client) := do
87+
return {
88+
tryFn := s.tryAccept
89+
registerFn waiter := do
90+
let task ← s.native.accept
91+
-- If we get cancelled the promise will be dropped so prepare for that
92+
IO.chainTask (t := task.result?) fun res => do
93+
match res with
94+
| none => return ()
95+
| some res =>
96+
let lose := return ()
97+
let win promise := do
98+
try
99+
let result ← IO.ofExcept res
100+
promise.resolve (.ok (Client.ofNative result))
101+
catch e =>
102+
promise.resolve (.error e)
103+
waiter.race lose win
104+
unregisterFn := s.native.cancelAccept
105+
}
106+
73107
/--
74108
Gets the local address of the server socket.
75109
-/

src/Std/Internal/UV/Signal.lean

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,15 @@ This function has different behavior depending on the state of the `Signal`:
7878
@[extern "lean_uv_signal_stop"]
7979
opaque stop (signal : @& Signal) : IO Unit
8080

81+
/--
82+
This function has different behavior depending on the state of the `Signal`:
83+
- If it is initial or finished this is a no-op.
84+
- If it's running then it drops the accept promise and if it's not repeatable it finishes
85+
the execution of the timer.
86+
-/
87+
@[extern "lean_uv_signal_cancel"]
88+
opaque cancel (signal : @& Signal) : IO Unit
89+
8190
end Signal
8291

8392
end UV

src/Std/Internal/UV/TCP.lean

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,18 @@ Accepts an incoming connection on a listening TCP socket.
9696
@[extern "lean_uv_tcp_accept"]
9797
opaque accept (socket : @& Socket) : IO (IO.Promise (Except IO.Error Socket))
9898

99+
/--
100+
Tries to accepts an incoming connection on a listening TCP socket.
101+
-/
102+
@[extern "lean_uv_tcp_try_accept"]
103+
opaque tryAccept (socket : @& Socket) : IO ((Except IO.Error (Option Socket)))
104+
105+
/--
106+
Cancels the accept request of a socket.
107+
-/
108+
@[extern "lean_uv_tcp_cancel_accept"]
109+
opaque cancelAccept (socket : @& Socket) : IO Unit
110+
99111
/--
100112
Shuts down an incoming connection on a listening TCP socket.
101113
-/

src/runtime/uv/signal.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg obj, obj_arg /
219219
}
220220
}
221221

222-
/* Std.Internal.UV.Signal.cancelNext (signal : @& Signal) : IO Unit */
223-
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel_next(b_obj_arg obj, obj_arg /* w */) {
222+
/* Std.Internal.UV.Signal.cancel (signal : @& Signal) : IO Unit */
223+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */) {
224224
lean_uv_signal_object * signal = lean_to_uv_signal(obj);
225225

226226
// It's locking here to avoid changing the state during other operations.
@@ -267,6 +267,13 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg signal, obj_ar
267267
);
268268
}
269269

270+
/* Std.Internal.UV.Signal.cancel (signal : @& Signal) : IO Unit */
271+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */) {
272+
lean_always_assert(
273+
false && ("Please build a version of Lean4 with libuv to invoke this.")
274+
);
275+
}
276+
270277
#endif
271278

272279
}

src/runtime/uv/signal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,6 @@ static inline lean_uv_signal_object* lean_to_uv_signal(lean_object * o) { return
5151
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_mk(uint32_t signum_obj, uint8_t repeating, obj_arg /* w */);
5252
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_next(b_obj_arg signal, obj_arg /* w */);
5353
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_stop(b_obj_arg signal, obj_arg /* w */);
54+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_signal_cancel(b_obj_arg obj, obj_arg /* w */);
5455

5556
}

src/runtime/uv/tcp.cpp

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_send(b_obj_arg socket, obj_arg d
231231
lean_dec(socket);
232232
lean_dec(data_array);
233233
free(bufs);
234-
234+
235235
free(write_uv->data);
236236
free(write_uv);
237237

@@ -524,6 +524,37 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket, obj_arg
524524
return lean_io_result_mk_ok(promise);
525525
}
526526

527+
/* Std.Internal.UV.TCP.Socket.tryAccept (socket : @& Socket) : IO (Except IO.Error (Option Socket)) */
528+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_try_accept(b_obj_arg socket, obj_arg /* w */) {
529+
lean_uv_tcp_socket_object* tcp_socket = lean_to_uv_tcp_socket(socket);
530+
531+
// Locking early prevents potential parallelism issues setting m_promise_accept.
532+
event_loop_lock(&global_ev);
533+
534+
if (tcp_socket->m_promise_accept != nullptr) {
535+
return lean_io_result_mk_error(lean_decode_uv_error(UV_EALREADY, mk_string("parallel accept is not allowed! consider binding multiple sockets to the same address and accepting on them instead")));
536+
}
537+
538+
lean_object* client = lean_io_result_take_value(lean_uv_tcp_new(lean_box(0)));
539+
lean_uv_tcp_socket_object* client_socket = lean_to_uv_tcp_socket(client);
540+
541+
int result = uv_accept((uv_stream_t*)tcp_socket->m_uv_tcp, (uv_stream_t*)client_socket->m_uv_tcp);
542+
543+
if (result < 0 && result != UV_EAGAIN) {
544+
event_loop_unlock(&global_ev);
545+
lean_dec(client);
546+
return lean_io_result_mk_error(lean_decode_uv_error(result, NULL));
547+
} else if (result >= 0) {
548+
event_loop_unlock(&global_ev);
549+
return lean_io_result_mk_ok(mk_except_ok(lean::mk_option_some(client)));
550+
} else {
551+
event_loop_unlock(&global_ev);
552+
lean_dec(client);
553+
return lean_io_result_mk_ok(mk_except_ok(lean::mk_option_none()));
554+
}
555+
}
556+
557+
527558

528559
/* Std.Internal.UV.TCP.Socket.cancelAccept (socket : @& Socket) : IO Unit */
529560
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_accept(b_obj_arg socket, obj_arg /* w */) {
@@ -718,6 +749,12 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_listen(b_obj_arg socket, int32_t
718749
);
719750
}
720751

752+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_accept(b_obj_arg socket, obj_arg /* w */) {
753+
lean_always_assert(
754+
false && ("Please build a version of Lean4 with libuv to invoke this.")
755+
);
756+
}
757+
721758
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket, obj_arg /* w */) {
722759
lean_always_assert(
723760
false && ("Please build a version of Lean4 with libuv to invoke this.")

src/runtime/uv/tcp.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ typedef struct {
3535
// Tcp socket object manipulation functions.
3636
static inline lean_object* lean_uv_tcp_socket_new(lean_uv_tcp_socket_object* s) { return lean_alloc_external(g_uv_tcp_socket_external_class, s); }
3737
static inline lean_uv_tcp_socket_object* lean_to_uv_tcp_socket(lean_object* o) { return (lean_uv_tcp_socket_object*)(lean_get_external_data(o)); }
38-
3938
#endif
4039

4140
// =======================================
@@ -50,7 +49,10 @@ extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_recv(b_obj_arg socket, ob
5049
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_bind(b_obj_arg socket, b_obj_arg addr, obj_arg /* w */);
5150
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_listen(b_obj_arg socket, int32_t backlog, obj_arg /* w */);
5251
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_accept(b_obj_arg socket, obj_arg /* w */);
52+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_cancel_accept(b_obj_arg socket, obj_arg /* w */);
53+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_try_accept(b_obj_arg socket, obj_arg /* w */);
5354
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_shutdown(b_obj_arg socket, obj_arg /* w */);
55+
extern "C" LEAN_EXPORT lean_obj_res lean_uv_tcp_try_accept(b_obj_arg socket, obj_arg /* w */);
5456

5557
// =======================================
5658
// TCP Socket Utility Functions

0 commit comments

Comments
 (0)