Skip to content

Commit 442d972

Browse files
committed
[fix] BrokenpipeError in socket
1 parent e7cf5b8 commit 442d972

File tree

2 files changed

+76
-65
lines changed

2 files changed

+76
-65
lines changed

skyplane/gateway/operators/gateway_operator.py

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,23 @@ def make_socket(self, dst_host):
261261
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
262262
return sock
263263

264+
def send_data(self, dst_host, header, data):
265+
# contact server to set up socket connection
266+
if self.destination_ports.get(dst_host) is None:
267+
self.destination_sockets[dst_host] = self.make_socket(dst_host)
268+
sock = self.destination_sockets[dst_host]
269+
270+
try:
271+
header.to_socket(sock)
272+
sock.sendall(data)
273+
except socket.error as e:
274+
print(e)
275+
del self.destination_ports[dst_host]
276+
return False
277+
# if successful, return True
278+
return True
279+
280+
264281
# send chunks to other instances
265282
def process(self, chunk_req: ChunkRequest, dst_host: str):
266283
"""Send list of chunks to gateway server, pipelining small chunks together into a single socket stream."""
@@ -307,15 +324,6 @@ def process(self, chunk_req: ChunkRequest, dst_host: str):
307324
print(f"[{self.handle}:{self.worker_id}] Error registering chunks {chunk_ids} to {dst_host}: {e}")
308325
raise e
309326

310-
# contact server to set up socket connection
311-
if self.destination_ports.get(dst_host) is None:
312-
print(f"[sender-{self.worker_id}]:{chunk_ids} creating new socket")
313-
self.destination_sockets[dst_host] = retry_backoff(
314-
partial(self.make_socket, dst_host), max_retries=3, exception_class=socket.timeout
315-
)
316-
print(f"[sender-{self.worker_id}]:{chunk_ids} created new socket")
317-
sock = self.destination_sockets[dst_host]
318-
319327
# TODO: cleanup so this isn't a loop
320328
for idx, chunk_req in enumerate(chunk_reqs):
321329
# self.chunk_store.state_start_upload(chunk_id, f"sender:{self.worker_id}")
@@ -347,16 +355,13 @@ def process(self, chunk_req: ChunkRequest, dst_host: str):
347355
raw_wire_length=raw_wire_length,
348356
is_compressed=(compressed_length is not None),
349357
)
350-
# print(f"[sender-{self.worker_id}]:{chunk_id} sending chunk header {header}")
351-
header.to_socket(sock)
352-
# print(f"[sender-{self.worker_id}]:{chunk_id} sent chunk header")
353-
354358
# send chunk data
355359
assert chunk_file_path.exists(), f"chunk file {chunk_file_path} does not exist"
356-
# file_size = os.path.getsize(chunk_file_path)
357-
358-
with Timer() as t:
359-
sock.sendall(data)
360+
361+
while True:
362+
with Timer() as t:
363+
is_suc = self.send_data(dst_host=dst_host, header=header, data=data)
364+
if is_suc: break
360365

361366
# logger.debug(f"[sender:{self.worker_id}]:{chunk_id} sent at {chunk.chunk_length_bytes * 8 / t.elapsed / MB:.2f}Mbps")
362367
print(f"[sender:{self.worker_id}]:{chunk_id} sent at {wire_length * 8 / t.elapsed / MB:.2f}Mbps")

skyplane/gateway/operators/gateway_receiver.py

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -145,31 +145,31 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]):
145145
init_space = self.chunk_store.remaining_bytes()
146146
print("Init space", init_space)
147147
while True:
148-
# receive header and write data to file
149-
logger.debug(f"[receiver:{server_port}] Blocking for next header")
150-
chunk_header = WireProtocolHeader.from_socket(conn)
151-
logger.debug(f"[receiver:{server_port}]:{chunk_header.chunk_id} Got chunk header {chunk_header}")
152-
153-
# TODO: this wont work
154-
# chunk_request = self.chunk_store.get_chunk_request(chunk_header.chunk_id)
155-
156-
should_decrypt = self.e2ee_secretbox is not None # and chunk_request.dst_region == self.region
157-
should_decompress = chunk_header.is_compressed # and chunk_request.dst_region == self.region
158-
159-
# wait for space
160-
# while self.chunk_store.remaining_bytes() < chunk_header.data_len * self.max_pending_chunks:
161-
# print(
162-
# f"[receiver:{server_port}]: No remaining space with bytes {self.chunk_store.remaining_bytes()} data len {chunk_header.data_len} max pending {self.max_pending_chunks}, total space {init_space}"
163-
# )
164-
# time.sleep(0.1)
165-
166-
# get data
167-
# self.chunk_store.state_queue_download(chunk_header.chunk_id)
168-
# self.chunk_store.state_start_download(chunk_header.chunk_id, f"receiver:{self.worker_id}")
169-
logger.debug(f"[receiver:{server_port}]:{chunk_header.chunk_id} wire header length {chunk_header.data_len}")
170-
with Timer() as t:
171-
fpath = self.chunk_store.get_chunk_file_path(chunk_header.chunk_id)
172-
with fpath.open("wb") as f:
148+
try:
149+
# receive header and write data to file
150+
logger.debug(f"[receiver:{server_port}] Blocking for next header")
151+
chunk_header = WireProtocolHeader.from_socket(conn)
152+
logger.debug(f"[receiver:{server_port}]:{chunk_header.chunk_id} Got chunk header {chunk_header}")
153+
154+
# TODO: this wont work
155+
# chunk_request = self.chunk_store.get_chunk_request(chunk_header.chunk_id)
156+
157+
should_decrypt = self.e2ee_secretbox is not None # and chunk_request.dst_region == self.region
158+
should_decompress = chunk_header.is_compressed # and chunk_request.dst_region == self.region
159+
160+
# wait for space
161+
# while self.chunk_store.remaining_bytes() < chunk_header.data_len * self.max_pending_chunks:
162+
# print(
163+
# f"[receiver:{server_port}]: No remaining space with bytes {self.chunk_store.remaining_bytes()} data len {chunk_header.data_len} max pending {self.max_pending_chunks}, total space {init_space}"
164+
# )
165+
# time.sleep(0.1)
166+
167+
# get data
168+
# self.chunk_store.state_queue_download(chunk_header.chunk_id)
169+
# self.chunk_store.state_start_download(chunk_header.chunk_id, f"receiver:{self.worker_id}")
170+
logger.debug(f"[receiver:{server_port}]:{chunk_header.chunk_id} wire header length {chunk_header.data_len}")
171+
with Timer() as t:
172+
173173
socket_data_len = chunk_header.data_len
174174
chunk_received_size, chunk_received_size_decompressed = 0, 0
175175
to_write = bytearray(socket_data_len)
@@ -199,29 +199,35 @@ def recv_chunks(self, conn: socket.socket, addr: Tuple[str, int]):
199199
print(
200200
f"[receiver:{server_port}]:{chunk_header.chunk_id} Decompressing {len(to_write)} bytes to {chunk_received_size_decompressed} bytes"
201201
)
202-
203-
# try to write data until successful
204-
while True:
205-
try:
206-
f.seek(0, 0)
207-
f.write(to_write)
208-
f.flush()
209-
210-
# check write succeeds
211-
assert os.path.exists(fpath)
212-
213-
# check size
214-
file_size = os.path.getsize(fpath)
215-
if file_size == chunk_header.raw_data_len:
216-
break
217-
elif file_size >= chunk_header.raw_data_len:
218-
raise ValueError(f"[Gateway] File size {file_size} greater than chunk size {chunk_header.raw_data_len}")
219-
except Exception as e:
220-
print(e)
221-
print(
222-
f"[receiver:{server_port}]: No remaining space with bytes {self.chunk_store.remaining_bytes()} data len {chunk_header.data_len} max pending {self.max_pending_chunks}, total space {init_space}"
223-
)
224-
time.sleep(1)
202+
except socket.error as e:
203+
print(e)
204+
# This may have pipeline broken error, if happened then restart receiver.
205+
continue
206+
207+
fpath = self.chunk_store.get_chunk_file_path(chunk_header.chunk_id)
208+
with fpath.open("wb") as f:
209+
# try to write data until successful
210+
while True:
211+
try:
212+
f.seek(0, 0)
213+
f.write(to_write)
214+
f.flush()
215+
216+
# check write succeeds
217+
assert os.path.exists(fpath)
218+
219+
# check size
220+
file_size = os.path.getsize(fpath)
221+
if file_size == chunk_header.raw_data_len:
222+
break
223+
elif file_size >= chunk_header.raw_data_len:
224+
raise ValueError(f"[Gateway] File size {file_size} greater than chunk size {chunk_header.raw_data_len}")
225+
except Exception as e:
226+
print(e)
227+
print(
228+
f"[receiver:{server_port}]: No remaining space with bytes {self.chunk_store.remaining_bytes()} data len {chunk_header.data_len} max pending {self.max_pending_chunks}, total space {init_space}"
229+
)
230+
time.sleep(1)
225231
assert (
226232
socket_data_len == 0 and chunk_received_size == chunk_header.data_len
227233
), f"Size mismatch: got {chunk_received_size} expected {chunk_header.data_len} and had {socket_data_len} bytes remaining"

0 commit comments

Comments
 (0)