Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions openc3/python/openc3/interfaces/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from openc3.utilities.logger import Logger
from openc3.logs.stream_log_pair import StreamLogPair

# TODO:
# require 'openc3/api/api'
# require 'openc3/utilities/secrets'

Expand Down
3 changes: 0 additions & 3 deletions openc3/python/openc3/interfaces/protocols/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
# Base class for all OpenC3 protocols which defines a framework which must be
# implemented by a subclass.
class Protocol:
# attr_accessor :interface
# attr_accessor :allow_empty_data

# self.param allow_empty_data [True/False/None] Whether or not this protocol will allow an empty string
# to be passed down to later Protocols (instead of returning 'STOP'). Can be True, False, or None, where
# None is interpreted as True if not the Protocol is the last Protocol of the chain.
Expand Down
74 changes: 74 additions & 0 deletions openc3/python/openc3/interfaces/stream_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2023 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

from openc3.interfaces.interface import Interface
from openc3.config.config_parser import ConfigParser
from openc3.utilities.logger import Logger


# Base class for interfaces that act read and write from a stream
class StreamInterface(Interface):
def __init__(self, protocol_type=None, protocol_args=[]):
super().__init__()
self.stream = None
self.protocol_type = ConfigParser.handle_none(protocol_type)
self.protocol_args = protocol_args
if self.protocol_type:
str(protocol_type).capitalize() + "Protocol"
# klass = OpenC3.require_class(class_name_to_filename(protocol_class_name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to actually do the dynamic import here.

# self.add_protocol(klass, protocol_args, "PARAMS")

def connect(self):
super()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be super().connect()

if self.stream:
self.stream.connect()

def connected(self):
if self.stream:
return self.stream.connected
else:
return False

def disconnect(self):
if self.stream:
self.stream.disconnect()
super().disconnect()

def read_interface(self):
timeout = False
try:
data = self.stream.read()
except TimeoutError:
Logger.error(f"{self.name}: Timeout waiting for data to be read")
timeout = True
data = None
if data is None or len(data) <= 0:
if data is None and not timeout:
Logger.info(
f"{self.name}: {self.stream.__class__.__name__} read returned None"
)
if data is not None and len(data) <= 0:
Logger.info(
f"{self.name}: {self.stream.__class__.__name__} read returned 0 bytes (stream closed)"
)
return None

self.read_interface_base(data)
return data

def write_interface(self, data):
self.write_interface_base(data)
self.stream.write(data)
66 changes: 66 additions & 0 deletions openc3/python/openc3/interfaces/tcpip_client_interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2023 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

from openc3.interfaces.stream_interface import StreamInterface
from openc3.streams.tcpip_client_stream import TcpipClientStream
from openc3.config.config_parser import ConfigParser


# Base class for interfaces that act as a TCP/IP client
class TcpipClientInterface(StreamInterface):
# self.param hostname [String] Machine to connect to
# self.param write_port [Integer] Port to write commands to
# self.param read_port [Integer] Port to read telemetry from
# self.param write_timeout [Float] Seconds to wait before aborting writes
# self.param read_timeout [Float|None] Seconds to wait before aborting reads.
# Pass None to block until the read is complete.
# self.param protocol_type [String] Name of the protocol to use
# with this interface
# self.param protocol_args [Array<String>] Arguments to pass to the protocol
def __init__(
self,
hostname,
write_port,
read_port,
write_timeout,
read_timeout,
protocol_type=None,
*protocol_args
):
super().__init__(protocol_type, protocol_args)
self.hostname = hostname
self.write_port = ConfigParser.handle_none(write_port)
self.read_port = ConfigParser.handle_none(read_port)
self.write_timeout = write_timeout
self.read_timeout = read_timeout
if not self.read_port:
self.read_allowed = False
if not self.write_port:
self.write_allowed = False
if not self.write_port:
self.write_raw_allowed = False

# Connects the {TcpipClientStream} by passing the
# initialization parameters to the {TcpipClientStream}.
def connect(self):
self.stream = TcpipClientStream(
self.hostname,
self.write_port,
self.read_port,
self.write_timeout,
self.read_timeout,
)
super().connect()
14 changes: 5 additions & 9 deletions openc3/python/openc3/script/stream.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
#!/usr/bin/env python3
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
# -*- coding: latin-1 -*-
"""
stream.py
"""

# Copyright 2022 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
Expand Down Expand Up @@ -67,7 +60,7 @@ def __init__(
self._loop = asyncio.new_event_loop()
self._stop_event = asyncio.Event()

if (schema == "http"):
if schema == "http":
self._url = f"ws://{hostname}:{port}"
else:
self._url = f"wss://{hostname}:{port}"
Expand Down Expand Up @@ -157,7 +150,10 @@ async def _listen(self, endpoint, sub_msg, callback):
"""
url = f"{self._url}{endpoint}"
try:
ws = await websockets.connect(f"{url}?scope={OPENC3_SCOPE}&authorization={self.auth.get()}", loop=self._loop)
ws = await websockets.connect(
f"{url}?scope={OPENC3_SCOPE}&authorization={self.auth.get()}",
loop=self._loop,
)
await self._welcome(ws)
await self._confirm(ws, sub_msg)
await self._handle(endpoint, ws, callback)
Expand Down
Empty file.
48 changes: 48 additions & 0 deletions openc3/python/openc3/streams/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2023 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.


# Class that implments the following methods= read, write(data),
# connect, connected? and disconnect. Streams are simply data sources which
# {Protocol} classes read and write to. This separation of concerns
# allows Streams to simply focus on getting and sending raw data while the
# higher level processing occurs in {Protocol}.
class Stream:
# Expected to return any amount of data on success, or a blank string on
# closed/EOF, and may raise Timeout='E'rror, or other errors
def read(self):
raise RuntimeError("read not defined by Stream")

# Expected to always return immediately with data if available or an empty string
# Should not raise errors
def read_nonblock(self):
raise RuntimeError("read_nonblock not defined by Stream")

# Expected to write complete set of data. May raise TimeoutError
# or other errors.
#
# self.param data [String] Binary data to write to the stream
def write(self, data):
raise RuntimeError("write not defined by Stream")

# Connects the stream
def connect(self):
raise RuntimeError("connect not defined by Stream")

# Disconnects the stream
# Note that streams are not designed to be reconnected and must be recreated
def disconnect(self):
raise RuntimeError("disconnect not defined by Stream")
146 changes: 146 additions & 0 deletions openc3/python/openc3/streams/tcpip_client_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2023 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

import socket
from openc3.streams.tcpip_socket_stream import TcpipSocketStream
from openc3.config.config_parser import ConfigParser


# Data {Stream} which reads and writes to TCPIP sockets. This class creates
# the actual sockets based on the constructor parameters. The rest of the
# interface is implemented by the super class {TcpipSocketStream}.
class TcpipClientStream(TcpipSocketStream):
# self.param hostname [String] The host to connect to
# self.param write_port [Integer|None] The port to write. Pass None to make this
# a read only stream.
# self.param read_port [Integer|None] The port to read. Pass None to make this
# a write only stream.
# self.param write_timeout [Float] Seconds to wait before aborting writes
# self.param read_timeout [Float|None] Seconds to wait before aborting reads.
# Pass None to block until the read is complete.
# self.param connect_timeout [Float|None] Seconds to wait before aborting connect.
# Pass None to block until the connection is complete.
def __init__(
self,
hostname,
write_port,
read_port,
write_timeout,
read_timeout,
connect_timeout=5.0,
):
try:
socket.gethostbyname(hostname)
except socket.gaierror as error:
raise RuntimeError(f"Invalid hostname {hostname}") from error
self.hostname = hostname
if str(hostname).upper() == "LOCALHOST":
self.hostname = "127.0.0.1"
self.write_port = ConfigParser.handle_none(write_port)
if self.write_port:
self.write_port = int(write_port)
self.read_port = ConfigParser.handle_none(read_port)
if self.read_port:
self.read_port = int(read_port)

# @write_addr = nil
# @read_addr = nil
# begin
# @write_addr = Socket.pack_sockaddr_in(@write_port, @hostname) if @write_port
# @read_addr = Socket.pack_sockaddr_in(@read_port, @hostname) if @read_port
# rescue => error
# if /getaddrinfo/.match?(error.message)
# raise "Invalid hostname: #{@hostname}"
# else
# raise error
# end
# end

# self.write_addr = None
# self.read_addr = None
# # try:
# if self.write_port:
# self.write_addr = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.write_addr.hostname = self.hostname
# self.write_addr.write_port = self.write_port
# if self.read_port:
# self.read_addr =socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# self.read_addr.hostname = self.hostname
# self.read_addr.write_port = self.read_port
# except:
# if /getaddrinfo/.match?(error.message):
# raise "Invalid hostname= {self.hostname}"
# else:
# raise error

write_socket = None
if self.write_port:
write_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
write_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
write_socket.setblocking(False)
read_socket = None
if self.read_port:
if self.write_port != self.read_port:
read_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
read_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
read_socket.setblocking(False)
else:
read_socket = write_socket

self.connect_timeout = ConfigParser.handle_none(connect_timeout)
if self.connect_timeout:
self.connect_timeout = float(connect_timeout)

super().__init__(write_socket, read_socket, write_timeout, read_timeout)

# Connect the socket(s)
def connect(self):
if self.write_socket:
self._connect(self.write_socket, self.hostname, self.write_port)
if self.read_socket and self.read_socket != self.write_socket:
self._connect(self.read_socket, self.hostname, self.read_port)
super().connect()

def _connect(self, socket, hostname, port):
while True:
try:
socket.connect((hostname, port))
except BlockingIOError:
# This is not an error condition
continue
except OSError as error:
if error.errno == 56: # [Errno 56] Socket is already connected
break
else:
raise error

# except:
# try:
# _, sockets, _ = IO.select(None, [socket], None, self.connect_timeout) # wait 3-way handshake completion
# except IOError, Errno='ENOTSOCK':
# raise "Connect canceled"
# if sockets and !sockets.empty?:
# try:
# socket.connect_nonblock(addr) # check connection failure
# except IOError, Errno='ENOTSOCK':
# raise "Connect canceled"
# except Errno='EINPROGRESS':
# retry
# except Errno='EISCONN', Errno='EALREADY':
# else:
# raise "Connect timeout"
# except IOError, Errno='ENOTSOCK':
# raise "Connect canceled"
Loading