Skip to content
82 changes: 50 additions & 32 deletions pybit/_websocket_stream.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import os

import websocket
import threading
import time
import json
from ._http_manager import generate_signature
import logging
import copy
import sys
from uuid import uuid4
from . import _helpers


logger = logging.getLogger(__name__)

from websocket._exceptions import WebSocketConnectionClosedException

SUBDOMAIN_TESTNET = "stream-testnet"
SUBDOMAIN_MAINNET = "stream"
Expand All @@ -22,26 +24,27 @@

class _WebSocketManager:
def __init__(
self,
callback_function,
ws_name,
testnet,
domain="",
demo=False,
rsa_authentication=False,
api_key=None,
api_secret=None,
ping_interval=20,
ping_timeout=10,
retries=10,
restart_on_error=True,
trace_logging=False,
private_auth_expire=1,
self,
callback_function,
ws_name,
testnet,
domain="",
demo=False,
rsa_authentication=False,
api_key=None,
api_secret=None,
ping_interval=20,
ping_timeout=10,
retries=10,
restart_on_error=True,
trace_logging=False,
private_auth_expire=1,
):
self.testnet = testnet
self.domain = domain
self.rsa_authentication = rsa_authentication
self.demo = demo
self.terminate = False
# Set API keys.
self.api_key = api_key
self.api_secret = api_secret
Expand All @@ -50,7 +53,7 @@ def __init__(
self.ws_name = ws_name
if api_key:
self.ws_name += " (Auth)"

# Delta time for private auth expiration in seconds
self.private_auth_expire = private_auth_expire

Expand Down Expand Up @@ -142,7 +145,7 @@ def resubscribe_to_topics():
infinitely_reconnect = False

while (
infinitely_reconnect or retries > 0
infinitely_reconnect or retries > 0
) and not self.is_connected():
logger.info(f"WebSocket {self.ws_name} attempting connection...")
self.ws = websocket.WebSocketApp(
Expand Down Expand Up @@ -172,13 +175,17 @@ def resubscribe_to_topics():
break

# If connection was not successful, raise error.
if not infinitely_reconnect and retries <= 0:
try:
if not infinitely_reconnect and retries <= 0:
raise websocket.WebSocketTimeoutException(
f"WebSocket {self.ws_name} ({self.endpoint}) connection "
f"failed. Too many connection attempts. pybit will no "
f"longer try to reconnect."
)
except websocket.WebSocketTimeoutException as error:
logger.error(error)
self.terminate = True
self.exit()
raise websocket.WebSocketTimeoutException(
f"WebSocket {self.ws_name} ({self.endpoint}) connection "
f"failed. Too many connection attempts. pybit will no "
f"longer try to reconnect."
)

logger.info(f"WebSocket {self.ws_name} connected")

Expand Down Expand Up @@ -255,7 +262,13 @@ def _on_pong(self):
self._send_custom_ping()

def _send_custom_ping(self):
self.ws.send(self.custom_ping_message)
try:
self.ws.send(self.custom_ping_message)
except WebSocketConnectionClosedException as error:
# Logging error and exiting hanging, non-reposing app (Let it fall).
logger.error(f"WebSocket {self.ws_name} not responding, error: {error}")
self.terminate = True
self.exit()

def _send_initial_ping(self):
"""https://github.com/bybit-exchange/pybit/issues/164"""
Expand Down Expand Up @@ -289,6 +302,11 @@ def exit(self):
while self.ws.sock:
continue
self.exited = True
if self.terminate:
import signal
p_id = os.getpid()
logger.error("Forcing kill after receiving critical error")
os.kill(1, signal.SIGKILL)


class _V5WebSocketManager(_WebSocketManager):
Expand Down Expand Up @@ -368,8 +386,8 @@ def _process_delta_orderbook(self, message, topic):

# Make updates according to delta response.
book_sides = {"b": message["data"]["b"], "a": message["data"]["a"]}
self.data[topic]["u"]=message["data"]["u"]
self.data[topic]["seq"]=message["data"]["seq"]
self.data[topic]["u"] = message["data"]["u"]
self.data[topic]["seq"] = message["data"]["seq"]

for side, entries in book_sides.items():
for entry in entries:
Expand Down Expand Up @@ -463,17 +481,17 @@ def _process_normal_message(self, message):
def _handle_incoming_message(self, message):
def is_auth_message():
if (
message.get("op") == "auth"
or message.get("type") == "AUTH_RESP"
message.get("op") == "auth"
or message.get("type") == "AUTH_RESP"
):
return True
else:
return False

def is_subscription_message():
if (
message.get("op") == "subscribe"
or message.get("type") == "COMMAND_RESP"
message.get("op") == "subscribe"
or message.get("type") == "COMMAND_RESP"
):
return True
else:
Expand Down