20
20
21
21
from threading import Thread
22
22
from concurrent .futures import ThreadPoolExecutor
23
- from typing import List , Callable
23
+ from typing import List , Callable , Optional
24
24
import socket
25
25
import logging
26
26
27
27
__all__ = ['CushyTCPClient' , 'CushyTCPServer' ]
28
- logging .basicConfig (level = logging .DEBUG , format = '%(asctime)s - %(levelname)s - %(message)s' )
28
+ logger = logging .getLogger (__name__ )
29
+
30
+ enable_log = False
31
+ if enable_log :
32
+ logging .basicConfig (level = logging .DEBUG , format = '%(asctime)s - %(levelname)s - %(message)s' )
29
33
30
34
31
35
class CushyTCPClient :
32
36
def __init__ (self , host : str , port : int ):
33
- self .logger = logging . getLogger ( __name__ )
37
+ self .logger = logger
34
38
self .host = host
35
39
self .port = port
36
40
self .sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
37
- self .callbacks : List [Callable ] = []
38
41
self .is_running = False
42
+ self .executor = ThreadPoolExecutor ()
43
+ self ._callbacks : List [Callable ] = []
44
+ self ._disconnected_callback : Optional [Callable ] = None
45
+ self ._connected_callback : Optional [Callable ] = None
39
46
40
47
def run (self ):
41
48
"""
42
49
startup CSTCP Client
43
50
"""
44
51
self .sock .connect ((self .host , self .port ))
45
52
self .is_running = True
53
+ if self ._connected_callback :
54
+ self .executor .submit (self ._connected_callback )
46
55
Thread (target = self ._recv_thread ).start ()
47
56
57
+ def _recv_thread (self ):
58
+ while True :
59
+ try :
60
+ msg = self .sock .recv (1024 ).decode ('utf-8' )
61
+ except Exception as e :
62
+ self .logger .error (f"[easy-socket] Error when receiving msg from server: { e } " )
63
+ break
64
+ if not msg :
65
+ self .logger .error ("[easy-socket] Server connection closed." )
66
+ break
67
+ self .logger .debug (f"[easy-socket] Received msg from server: { msg } " )
68
+ for callback in self ._callbacks :
69
+ self .executor .submit (callback , msg )
70
+
71
+ if self ._disconnected_callback :
72
+ self .executor .submit (self ._disconnected_callback )
73
+
48
74
def send (self , msg : str or bytes ):
49
75
if type (msg ) == str :
50
76
self ._send (msg .encode ('utf-8' ))
@@ -56,19 +82,8 @@ def send(self, msg: str or bytes):
56
82
def _send (self , msg : bytes ):
57
83
self .sock .sendall (msg )
58
84
59
- def _recv_thread (self ):
60
- while True :
61
- msg = self .sock .recv (1024 ).decode ('utf-8' )
62
- if not msg :
63
- self .logger .error ("[cushy-socket] Server connection closed." )
64
- break
65
- self .logger .info (f"[cushy-socket] Received msg from server: { msg } " )
66
-
67
- for callback in self .callbacks :
68
- callback (msg )
69
-
70
85
def listen (self , callback : Callable ):
71
- self .callbacks .append (callback )
86
+ self ._callbacks .append (callback )
72
87
73
88
def on_message (self ):
74
89
"""
@@ -79,37 +94,54 @@ def on_message(self):
79
94
----------------------------------------------------------------------
80
95
from cushy_socket.tcp import CushyTCPClient
81
96
82
- es_tcp_client = CushyTCPClient(host='localhost', port=7777)
83
- es_tcp_client .run()
97
+ cushy_tcp_client = CushyTCPClient(host='localhost', port=7777)
98
+ cushy_tcp_client .run()
84
99
85
100
86
- @es_tcp_client .on_message()
101
+ @cushy_tcp_client .on_message()
87
102
def handle_msg_from_server(msg: str):
88
- print(f"[client decorator callback] es_tcp_client rec msg: {msg}")
103
+ print(f"[client decorator callback] cushy_tcp_client rec msg: {msg}")
89
104
----------------------------------------------------------------------
90
105
"""
106
+
107
+ def decorator (func ):
108
+ self ._callbacks .append (func )
109
+ return func
110
+
111
+ return decorator
112
+
113
+ def on_connected (self ):
114
+ def decorator (func ):
115
+ self ._connected_callback = func
116
+ return func
117
+
118
+ return decorator
119
+
120
+ def on_disconnected (self ):
91
121
def decorator (func ):
92
- self .callbacks . append ( func )
122
+ self ._disconnected_callback = func
93
123
return func
94
124
95
125
return decorator
96
126
97
127
def close (self ):
128
+ self .sock .shutdown (2 )
98
129
self .sock .close ()
99
130
self .is_running = False
100
131
101
132
102
133
class CushyTCPServer :
103
134
def __init__ (self , host : str , port : int ):
104
- self .logger = logging . getLogger ( __name__ )
135
+ self .logger = logger
105
136
self .host = host
106
137
self .port = port
107
138
self .sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
108
- # self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
109
139
self .clients = set ()
110
- self .callbacks : List [Callable ] = []
111
140
self .is_running = False
112
141
self .executor = ThreadPoolExecutor ()
142
+ self ._callbacks : List [Callable ] = []
143
+ self ._disconnected_callback : Optional [Callable ] = None
144
+ self ._connected_callback : Optional [Callable ] = None
113
145
114
146
def run (self ):
115
147
"""
@@ -118,14 +150,15 @@ def run(self):
118
150
self .sock .bind ((self .host , self .port ))
119
151
self .sock .listen ()
120
152
self .is_running = True
121
- # self.executor.submit(self._accept_thread)
122
153
Thread (target = self ._accept_thread ).start ()
123
154
124
155
def _accept_thread (self ):
125
156
while True :
126
157
client_sock , client_addr = self .sock .accept ()
127
- self .logger .info (f"[cushy-socket] New client connected: { client_addr } " )
158
+ self .logger .debug (f"[cushy-socket] New client connected: { client_addr } " )
128
159
self .clients .add (client_sock )
160
+ if self ._connected_callback :
161
+ self .executor .submit (self ._connected_callback , client_sock )
129
162
self .executor .submit (self ._recv_thread , client_sock )
130
163
131
164
def _recv_thread (self , sock : socket .socket ):
@@ -138,18 +171,21 @@ def _recv_thread(self, sock: socket.socket):
138
171
try :
139
172
msg = sock .recv (1024 ).decode ('utf-8' )
140
173
except Exception as e :
141
- self .logger .error (f"[cushy-socket] Error when receiving msg from client: { e } " )
142
- self .clients .remove (sock )
143
- sock .close ()
174
+ self ._client_close (sock , 'error' , f"[cushy-socket] Error when receiving msg from client: { e } " )
144
175
break
145
176
if not msg :
146
- self .logger .info ("[cushy-socket] Client connection closed." )
147
- self .clients .remove (sock )
148
- sock .close ()
177
+ self ._client_close (sock , 'info' , "[cushy-socket] Client connection closed." )
149
178
break
150
- self .logger .info (f"[cushy-socket] Received msg from client: { msg } " )
179
+ self .logger .debug (f"[cushy-socket] Received msg from client: { msg } " )
151
180
self .executor .submit (self ._callback_thread , msg )
152
181
182
+ def _client_close (self , sock : socket .socket , log_type : str , log_msg : str ):
183
+ self .logger .debug (log_msg ) if log_type == 'info' else self .logger .debug (log_msg )
184
+ if self ._disconnected_callback :
185
+ self .executor .submit (self ._disconnected_callback , sock )
186
+ self .clients .remove (sock )
187
+ sock .close ()
188
+
153
189
def send (self , msg : str or bytes , sock : socket .socket = None ):
154
190
"""
155
191
send message to connected socket. You can choose specify socket client send message
@@ -169,11 +205,11 @@ def _send(self, msg: str or bytes, sock: socket.socket):
169
205
raise Exception ("Incorrect data type" )
170
206
171
207
def _callback_thread (self , msg : str ):
172
- for callback in self .callbacks :
173
- callback ( msg )
208
+ for callback in self ._callbacks :
209
+ self . executor . submit ( callback , msg )
174
210
175
211
def listen (self , callback : Callable ):
176
- self .callbacks .append (callback )
212
+ self ._callbacks .append (callback )
177
213
178
214
def on_message (self ):
179
215
"""
@@ -184,18 +220,33 @@ def on_message(self):
184
220
----------------------------------------------------------------------
185
221
from cushy_socket.tcp import CushyTCPServer
186
222
187
- es_tcp_server = CushyTCPServer(host='localhost', port=7777)
188
- es_tcp_server .run()
223
+ cushy_tcp_server = CushyTCPServer(host='localhost', port=7777)
224
+ cushy_tcp_server .run()
189
225
190
226
191
- @es_tcp_server .on_message()
227
+ @cushy_tcp_server .on_message()
192
228
def handle_msg_from_client(msg: str):
193
- print(f"[server decorator callback] es_tcp_server rec msg: {msg}")
194
- es_tcp_server .send("hello, I am server")
229
+ print(f"[server decorator callback] cushy_tcp_server rec msg: {msg}")
230
+ cushy_tcp_server .send("hello, I am server")
195
231
----------------------------------------------------------------------
196
232
"""
233
+
234
+ def decorator (func ):
235
+ self ._callbacks .append (func )
236
+ return func
237
+
238
+ return decorator
239
+
240
+ def on_connected (self ):
241
+ def decorator (func ):
242
+ self ._connected_callback = func
243
+ return func
244
+
245
+ return decorator
246
+
247
+ def on_disconnected (self ):
197
248
def decorator (func ):
198
- self .callbacks . append ( func )
249
+ self ._disconnected_callback = func
199
250
return func
200
251
201
252
return decorator
0 commit comments