Skip to content

Commit 442b871

Browse files
committed
wip
1 parent 2cf2b0c commit 442b871

File tree

13 files changed

+1066
-133
lines changed

13 files changed

+1066
-133
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ extend-ignore = [
7979
'S101', # assert is fine
8080
]
8181
flake8-quotes = { inline-quotes = 'single', multiline-quotes = 'double' }
82-
mccabe = { max-complexity = 16 }
82+
mccabe = { max-complexity = 22 }
8383

8484
[tool.ruff.format]
8585
quote-style = 'single'

rloop/_rloop.pyi

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,12 @@ class EventLoop:
4747
def _ssock_del(self, fd): ...
4848
def call_soon(self, callback, *args, context=None) -> CBHandle: ...
4949
def call_soon_threadsafe(self, callback, *args, context=None) -> CBHandle: ...
50+
51+
class Server:
52+
_loop: Any
53+
_sff: Any
54+
55+
def _add_waiter(self, waiter): ...
56+
def _start_serving(self): ...
57+
def close(self): ...
58+
def is_serving(self) -> bool: ...

rloop/loop.py

Lines changed: 110 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio as __asyncio
22
import errno
3+
import os
34
import signal
45
import socket
56
import subprocess
@@ -12,6 +13,7 @@
1213
from asyncio.tasks import Task as _Task, ensure_future as _ensure_future, gather as _gather
1314
from concurrent.futures import ThreadPoolExecutor
1415
from contextvars import copy_context as _copy_context
16+
from itertools import chain as _iterchain
1517
from typing import Union
1618

1719
from ._compat import _PY_311, _PYV
@@ -288,10 +290,10 @@ def set_default_executor(self, executor):
288290

289291
#: network I/O methods
290292
async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
291-
raise NotImplementedError
293+
return await self.run_in_executor(None, socket.getaddrinfo, host, port, family, type, proto, flags)
292294

293295
async def getnameinfo(self, sockaddr, flags=0):
294-
raise NotImplementedError
296+
return await self.run_in_executor(None, socket.getnameinfo, sockaddr, flags)
295297

296298
async def create_connection(
297299
self,
@@ -331,7 +333,112 @@ async def create_server(
331333
ssl_shutdown_timeout=None,
332334
start_serving=True,
333335
):
334-
raise NotImplementedError
336+
if isinstance(ssl, bool):
337+
raise TypeError('ssl argument must be an SSLContext or None')
338+
339+
if ssl_handshake_timeout is not None and ssl is None:
340+
raise ValueError('ssl_handshake_timeout is only meaningful with ssl')
341+
342+
if ssl_shutdown_timeout is not None and ssl is None:
343+
raise ValueError('ssl_shutdown_timeout is only meaningful with ssl')
344+
345+
# TODO
346+
# if sock is not None:
347+
# _check_ssl_socket(sock)
348+
349+
if host is not None or port is not None:
350+
if sock is not None:
351+
raise ValueError('host/port and sock can not be specified at the same time')
352+
353+
if reuse_address is None:
354+
reuse_address = os.name == 'posix' and sys.platform != 'cygwin'
355+
356+
sockets = []
357+
if host == '':
358+
hosts = [None]
359+
elif isinstance(host, str) or not isinstance(host, (tuple, list)):
360+
hosts = [host]
361+
else:
362+
hosts = host
363+
364+
fs = [self._create_server_getaddrinfo(host, port, family=family, flags=flags) for host in hosts]
365+
infos = await _gather(*fs)
366+
infos = set(_iterchain.from_iterable(infos))
367+
368+
completed = False
369+
try:
370+
for res in infos:
371+
af, socktype, proto, canonname, sa = res
372+
try:
373+
sock = socket.socket(af, socktype, proto)
374+
except socket.error:
375+
# Assume it's a bad family/type/protocol combination.
376+
continue
377+
sockets.append(sock)
378+
if reuse_address:
379+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
380+
# TODO
381+
# if reuse_port:
382+
# _set_reuseport(sock)
383+
if keep_alive:
384+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
385+
# Disable IPv4/IPv6 dual stack support (enabled by
386+
# default on Linux) which makes a single socket
387+
# listen on both address families.
388+
if _HAS_IPv6 and af == socket.AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'):
389+
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, True)
390+
try:
391+
sock.bind(sa)
392+
except OSError as err:
393+
msg = 'error while attempting to bind on address %r: %s' % (sa, str(err).lower())
394+
if err.errno == errno.EADDRNOTAVAIL:
395+
# Assume the family is not enabled (bpo-30945)
396+
sockets.pop()
397+
sock.close()
398+
continue
399+
raise OSError(err.errno, msg) from None
400+
401+
if not sockets:
402+
raise OSError('could not bind on any address out of %r' % ([info[4] for info in infos],))
403+
404+
completed = True
405+
finally:
406+
if not completed:
407+
for sock in sockets:
408+
sock.close()
409+
else:
410+
if sock is None:
411+
raise ValueError('Neither host/port nor sock were specified')
412+
if sock.type != socket.SOCK_STREAM:
413+
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
414+
sockets = [sock]
415+
416+
for sock in sockets:
417+
sock.setblocking(False)
418+
419+
# TODO
420+
# server = Server(self, sockets, protocol_factory,
421+
# ssl, backlog, ssl_handshake_timeout,
422+
# ssl_shutdown_timeout)
423+
server = self._tcp_server([sock.fileno() for sock in sockets], protocol_factory, backlog)
424+
425+
# TODO
426+
# if start_serving:
427+
# server._start_serving()
428+
# # Skip one loop iteration so that all 'loop.add_reader'
429+
# # go through.
430+
# await tasks.sleep(0)
431+
432+
return server
433+
434+
async def _create_server_getaddrinfo(self, host, port, family, flags):
435+
infos = await self._ensure_resolved(
436+
(host, port), family=family, type=socket.SOCK_STREAM, flags=flags, loop=self
437+
)
438+
if not infos:
439+
raise OSError(f'getaddrinfo({host!r}) returned empty list')
440+
441+
return infos
335442

336443
async def sendfile(self, transport, file, offset=0, count=None, *, fallback=True):
337444
raise NotImplementedError

rloop/server.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import asyncio
2+
3+
from ._rloop import Server as _Server
4+
5+
6+
class Server(_Server):
7+
def get_loop(self):
8+
return self._loop
9+
10+
# TODO
11+
def close_clients(self):
12+
raise NotImplementedError
13+
14+
# TODO
15+
def abort_clients(self):
16+
raise NotImplementedError
17+
18+
async def start_serving(self):
19+
self._start_serving()
20+
21+
async def serve_forever(self):
22+
if self._sff is not None:
23+
raise RuntimeError(f'server {self!r} is already being awaited on serve_forever()')
24+
# if self._servers is None:
25+
# raise RuntimeError(f'server {self!r} is closed')
26+
27+
self._start_serving()
28+
self._sff = self._loop.create_future()
29+
30+
try:
31+
await self._sff
32+
except asyncio.CancelledError:
33+
try:
34+
self.close()
35+
await self.wait_closed()
36+
finally:
37+
raise
38+
finally:
39+
self._sff = None
40+
41+
async def wait_closed(self):
42+
# if self._servers is None or self._waiters is None:
43+
# return
44+
waiter = self._loop.create_future()
45+
self._add_waiter(waiter)
46+
# self._waiters.append(waiter)
47+
await waiter
48+
49+
async def __aenter__(self):
50+
return self
51+
52+
async def __aexit__(self, *exc):
53+
self.close()
54+
await self.wait_closed()
55+
56+
# TODO
57+
@property
58+
def sockets(self):
59+
raise NotImplementedError

0 commit comments

Comments
 (0)