Skip to content

Commit a561ce7

Browse files
committed
Add support for filelike objects in create_dataset
- Refactor generator definitions into generators subpackage - Use singledispatch to read packet files in xarr.py - Add singledispatch for setting up generator binary reader - Cleanup typehinting in definitions.py - Add tests for generators module
1 parent f647e8c commit a561ce7

26 files changed

+782
-292
lines changed

examples/csv_to_xtce_conversion.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import warnings
1818
from pathlib import Path
1919

20-
from space_packet_parser import ccsds
20+
from space_packet_parser import generators
2121
from space_packet_parser.xtce import containers, definitions, encodings, parameter_types, parameters
2222

2323
# This regex is for detecting a dynamically sized field where its bit_length is
@@ -170,7 +170,7 @@ def convert_ccsdspy_to_xtce(csv_path: Path) -> definitions.XtcePacketDefinition:
170170

171171
packet_file = jpss_test_data_dir / "J01_G011_LZ_2021-04-09T00-00-00Z_V01.DAT1"
172172
with packet_file.open("rb") as packet_fh:
173-
ccsds_generator = ccsds.ccsds_generator(packet_fh)
173+
ccsds_generator = generators.ccsds_generator(packet_fh)
174174
packets = [xtce_definition.parse_bytes(binary_data) for binary_data in ccsds_generator]
175175

176176
assert len(packets) == 7200 # noqa S101

examples/parsing_and_plotting_idex_waveforms_from_socket.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import matplotlib.pyplot as plt
2323

24-
from space_packet_parser import ccsds
24+
from space_packet_parser import generators
2525
from space_packet_parser.xtce import definitions
2626

2727

@@ -116,7 +116,7 @@ def plot_full_event(data: dict):
116116
p.start()
117117

118118
# Create a packet generator that listens to a socket
119-
idex_ccsds_generator = ccsds.ccsds_generator(receiver)
119+
idex_ccsds_generator = generators.ccsds_generator(receiver)
120120
# No data yet. We start recording data from an event when we encounter a packet with IDX__SCI0TYPE==1
121121
data: dict[int, bytes] = {}
122122
try:

space_packet_parser/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from pathlib import Path
44
from typing import Union
55

6-
from space_packet_parser.ccsds import ccsds_generator
76
from space_packet_parser.common import SpacePacket
7+
from space_packet_parser.generators import ccsds_generator
88
from space_packet_parser.xtce.definitions import XtcePacketDefinition
99
from space_packet_parser.xtce.validation import validate_xtce
1010

@@ -20,6 +20,8 @@
2020
def load_xtce(filename: Union[str, Path]) -> XtcePacketDefinition:
2121
"""Create an XtcePacketDefinition object from an XTCE XML file
2222
23+
This is a shortcut for calling XtcePacketDefinition.from_xtce().
24+
2325
Parameters
2426
----------
2527
filename : Union[str, Path]

space_packet_parser/cli.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
from rich.table import Table
2424
from rich.tree import Tree
2525

26-
from space_packet_parser import ccsds
27-
from space_packet_parser.ccsds import ccsds_generator
26+
from space_packet_parser import generators
27+
from space_packet_parser.generators import ccsds_generator
2828
from space_packet_parser.xtce.definitions import DEFAULT_ROOT_CONTAINER, XtcePacketDefinition
2929
from space_packet_parser.xtce.validation import validate_xtce
3030

@@ -198,7 +198,7 @@ def parse(
198198
logging.debug(f"Using packet definition file: {definition_file}")
199199
packet_definition = XtcePacketDefinition.from_xtce(definition_file)
200200
with open(packet_file, "rb") as f:
201-
ccsds_generator = ccsds.ccsds_generator(f, skip_header_bytes=skip_header_bytes)
201+
ccsds_generator = generators.ccsds_generator(f, skip_header_bytes=skip_header_bytes)
202202
packets = [packet_definition.parse_bytes(binary_data) for binary_data in ccsds_generator]
203203

204204
if packet is not None:

space_packet_parser/common.py

Lines changed: 1 addition & 162 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
"""Common mixins"""
22

3-
import datetime as dt
43
import inspect
5-
import io
64
import logging
7-
import socket
8-
import time
95
import warnings
106
from abc import ABCMeta, abstractmethod
11-
from collections.abc import Iterator
12-
from typing import BinaryIO, Optional, Protocol, Union
7+
from typing import Optional, Protocol, Union
138

149
import lxml.etree as ElementTree
1510
from lxml.builder import ElementMaker
@@ -387,162 +382,6 @@ def _read_from_binary_as_int(self, nbits: int) -> int:
387382
return int_data
388383

389384

390-
def fixed_length_generator(
391-
binary_data: Union[BinaryIO, socket.socket, bytes],
392-
*,
393-
packet_length_bytes: int,
394-
buffer_read_size_bytes: Optional[int] = None,
395-
show_progress: bool = False,
396-
) -> Iterator[bytes]:
397-
"""A generator that yields fixed-length chunks from binary_data.
398-
399-
Parameters
400-
----------
401-
binary_data : Union[BinaryIO, socket.socket, bytes]
402-
Binary data source.
403-
packet_length_bytes : int
404-
Number of bytes per packet to yield.
405-
buffer_read_size_bytes : int, optional
406-
Number of bytes to read from the source per read.
407-
show_progress : bool
408-
If True, prints a status bar.
409-
410-
Yields
411-
------
412-
bytes
413-
Fixed-length packet bytes.
414-
"""
415-
n_bytes_parsed = 0 # Keep track of how many bytes we have parsed
416-
n_packets_parsed = 0 # Keep track of how many packets we have parsed
417-
read_buffer, total_length_bytes, read_bytes_from_source, buffer_read_size_bytes = _setup_binary_reader(
418-
binary_data, buffer_read_size_bytes
419-
)
420-
current_pos = 0 # Keep track of where we are in the buffer
421-
start_time = time.time_ns()
422-
while True:
423-
if total_length_bytes and n_bytes_parsed == total_length_bytes:
424-
break
425-
if show_progress:
426-
_print_progress(
427-
current_bytes=n_bytes_parsed,
428-
total_bytes=total_length_bytes,
429-
start_time_ns=start_time,
430-
current_packets=n_packets_parsed,
431-
)
432-
if current_pos > 20_000_000:
433-
# Only trim the buffer after 20 MB read to prevent modifying
434-
# the bitstream and trimming after every packet
435-
read_buffer = read_buffer[current_pos:]
436-
current_pos = 0
437-
while len(read_buffer) - current_pos < packet_length_bytes:
438-
result = read_bytes_from_source(buffer_read_size_bytes)
439-
if not result:
440-
break
441-
read_buffer += result
442-
packet_bytes = read_buffer[current_pos : current_pos + packet_length_bytes]
443-
current_pos += packet_length_bytes
444-
n_packets_parsed += 1
445-
n_bytes_parsed += packet_length_bytes
446-
yield packet_bytes
447-
if show_progress:
448-
_print_progress(
449-
current_bytes=n_bytes_parsed,
450-
total_bytes=total_length_bytes,
451-
start_time_ns=start_time,
452-
current_packets=n_packets_parsed,
453-
end="\n",
454-
log=True,
455-
)
456-
457-
458-
def _setup_binary_reader(binary_data, buffer_read_size_bytes=None):
459-
"""Helper to set up reading from binary_data (file, socket, bytes). Returns:
460-
read_buffer, total_length_bytes, read_bytes_from_source, buffer_read_size_bytes
461-
"""
462-
read_buffer = b""
463-
# ========
464-
# Set up the reader based on the type of binary_data
465-
# ========
466-
if isinstance(binary_data, io.BufferedIOBase):
467-
if buffer_read_size_bytes is None:
468-
# Default to a full read of the file
469-
buffer_read_size_bytes = -1
470-
total_length_bytes = binary_data.seek(0, io.SEEK_END) # This is probably preferable to len
471-
binary_data.seek(0, 0)
472-
logger.info(
473-
f"Creating packet generator from a filelike object, {binary_data}. "
474-
f"Total length is {total_length_bytes} bytes"
475-
)
476-
read_bytes_from_source = binary_data.read
477-
elif isinstance(binary_data, socket.socket): # It's a socket and we don't know how much data we will get
478-
logger.info("Creating packet generator to read from a socket. Total length to parse is unknown.")
479-
total_length_bytes = None # We don't know how long it is
480-
if buffer_read_size_bytes is None:
481-
# Default to 4096 bytes from a socket
482-
buffer_read_size_bytes = 4096
483-
read_bytes_from_source = binary_data.recv
484-
elif isinstance(binary_data, bytes):
485-
read_buffer = binary_data
486-
total_length_bytes = len(read_buffer)
487-
read_bytes_from_source = None # No data to read, we've filled the read_buffer already
488-
logger.info(f"Creating packet generator from a bytes object. Total length is {total_length_bytes} bytes")
489-
elif isinstance(binary_data, io.TextIOWrapper):
490-
raise OSError("Packet data file opened in TextIO mode. You must open packet data in binary mode.")
491-
else:
492-
raise OSError(f"Unrecognized data source: {binary_data}")
493-
return read_buffer, total_length_bytes, read_bytes_from_source, buffer_read_size_bytes
494-
495-
496-
def _print_progress(
497-
*,
498-
current_bytes: int,
499-
total_bytes: Optional[int],
500-
start_time_ns: int,
501-
current_packets: int,
502-
end: str = "\r",
503-
log: bool = False,
504-
):
505-
"""Prints a progress bar, including statistics on parsing rate.
506-
507-
Parameters
508-
----------
509-
current_bytes : int
510-
Number of bytes parsed so far.
511-
total_bytes : Optional[int]
512-
Number of total bytes to parse, if known. None otherwise.
513-
current_packets : int
514-
Number of packets parsed so far.
515-
start_time_ns : int
516-
Start time on system clock, in nanoseconds.
517-
end : str
518-
Print function end string. Default is `\\r` to create a dynamically updating loading bar.
519-
log : bool
520-
If True, log the progress bar at INFO level.
521-
"""
522-
progress_char = "="
523-
bar_length = 20
524-
525-
if total_bytes is not None: # If we actually have an endpoint (i.e. not using a socket)
526-
percentage = int((current_bytes / total_bytes) * 100) # Percent Completed Calculation
527-
progress = int((bar_length * current_bytes) / total_bytes) # Progress Done Calculation
528-
else:
529-
percentage = "???"
530-
progress = 0
531-
532-
# Fast calls initially on Windows can result in a zero elapsed time
533-
elapsed_ns = max(time.time_ns() - start_time_ns, 1)
534-
delta = dt.timedelta(microseconds=elapsed_ns / 1e3)
535-
kbps = int(current_bytes * 8e6 / elapsed_ns) # 8 bits per byte, 1E9 s per ns, 1E3 bits per kb
536-
pps = int(current_packets * 1e9 / elapsed_ns)
537-
info_str = (
538-
f"[Elapsed: {delta}, Parsed {current_bytes} bytes ({current_packets} packets) at {kbps}kb/s ({pps}pkts/s)]"
539-
)
540-
loadbar = f"Progress: [{progress * progress_char:{bar_length}}]{percentage}% {info_str}"
541-
print(loadbar, end=end)
542-
if log:
543-
logger.info(loadbar)
544-
545-
546385
def _extract_bits(data: bytes, start_bit: int, nbits: int):
547386
"""Extract nbits from the data starting from the least significant end.
548387
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Generators subpackage, containing packet generators for different packet formats."""
2+
3+
from space_packet_parser.generators.ccsds import ccsds_generator
4+
from space_packet_parser.generators.fixed_length import fixed_length_generator
5+
6+
__all__ = ["ccsds_generator", "fixed_length_generator"]

space_packet_parser/ccsds.py renamed to space_packet_parser/generators/ccsds.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
"""Parsing utilities for CCSDS packets.
1+
"""Packet generator utilities for CCSDS packets.
22
33
The parsing begins with binary data representing CCSDS Packets. A user can then create a generator
44
from the binary data reading from a filelike object or a socket. The ``ccsds_generator`` function yields
@@ -16,7 +16,8 @@ class can be used to inspect the CCSDS header fields of the packet, but it does
1616
from enum import IntEnum
1717
from typing import BinaryIO, Optional, Union
1818

19-
from space_packet_parser.common import SpacePacket, _print_progress, _setup_binary_reader
19+
from space_packet_parser.common import SpacePacket
20+
from space_packet_parser.generators.utils import _print_progress, _setup_binary_reader
2021

2122
logger = logging.getLogger(__name__)
2223

@@ -31,7 +32,7 @@ class SequenceFlags(IntEnum):
3132

3233

3334
class CCSDSPacketBytes(bytes):
34-
"""Binary representation of a CCSDS packet.
35+
"""Binary (bytes) representation of a CCSDS packet.
3536
3637
Methods to extract the header fields are added to the raw bytes object.
3738
"""
@@ -170,6 +171,10 @@ def create_ccsds_packet(
170171
-------
171172
: CCSDSPacketBytes
172173
Resulting binary packet
174+
175+
Notes
176+
-----
177+
This function is extremely useful for generating test packets for debugging or mocking purposes.
173178
"""
174179
if version_number < 0 or version_number > 7: # 3 bits
175180
raise ValueError("version_number must be between 0 and 7")
@@ -223,7 +228,7 @@ class to extract the header fields with specific methods.
223228
def __init__(self, *args, **kwargs):
224229
warnings.warn(
225230
"The CCSDSPacket class is deprecated and will be removed in a future release. "
226-
"Use the Packet class instead (no CCSDS prefix).",
231+
"Use the SpacePacket class instead (no CCSDS prefix).",
227232
DeprecationWarning,
228233
stacklevel=2,
229234
)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""Fixed length packet generator."""
2+
3+
import socket
4+
import time
5+
from collections.abc import Iterator
6+
from typing import BinaryIO, Optional, Union
7+
8+
from space_packet_parser.generators.utils import _print_progress, _setup_binary_reader
9+
10+
11+
def fixed_length_generator(
12+
binary_data: Union[BinaryIO, socket.socket, bytes],
13+
*,
14+
packet_length_bytes: int,
15+
buffer_read_size_bytes: Optional[int] = None,
16+
show_progress: bool = False,
17+
) -> Iterator[bytes]:
18+
"""A generator that yields fixed-length chunks from binary_data.
19+
20+
Parameters
21+
----------
22+
binary_data : Union[BinaryIO, socket.socket, bytes]
23+
Binary data source.
24+
packet_length_bytes : int
25+
Number of bytes per packet to yield.
26+
buffer_read_size_bytes : int, optional
27+
Number of bytes to read from the source per read.
28+
show_progress : bool
29+
If True, prints a status bar.
30+
31+
Yields
32+
------
33+
bytes
34+
Fixed-length packet bytes.
35+
"""
36+
n_bytes_parsed = 0 # Keep track of how many bytes we have parsed
37+
n_packets_parsed = 0 # Keep track of how many packets we have parsed
38+
read_buffer, total_length_bytes, read_bytes_from_source, buffer_read_size_bytes = _setup_binary_reader(
39+
binary_data, buffer_read_size_bytes
40+
)
41+
current_pos = 0 # Keep track of where we are in the buffer
42+
start_time = time.time_ns()
43+
while True:
44+
if total_length_bytes and n_bytes_parsed == total_length_bytes:
45+
break
46+
if show_progress:
47+
_print_progress(
48+
current_bytes=n_bytes_parsed,
49+
total_bytes=total_length_bytes,
50+
start_time_ns=start_time,
51+
current_packets=n_packets_parsed,
52+
)
53+
if current_pos > 20_000_000:
54+
# Only trim the buffer after 20 MB read to prevent modifying
55+
# the bitstream and trimming after every packet
56+
read_buffer = read_buffer[current_pos:]
57+
current_pos = 0
58+
while len(read_buffer) - current_pos < packet_length_bytes:
59+
result = read_bytes_from_source(buffer_read_size_bytes)
60+
if not result:
61+
break
62+
read_buffer += result
63+
packet_bytes = read_buffer[current_pos : current_pos + packet_length_bytes]
64+
current_pos += packet_length_bytes
65+
n_packets_parsed += 1
66+
n_bytes_parsed += packet_length_bytes
67+
yield packet_bytes
68+
if show_progress:
69+
_print_progress(
70+
current_bytes=n_bytes_parsed,
71+
total_bytes=total_length_bytes,
72+
start_time_ns=start_time,
73+
current_packets=n_packets_parsed,
74+
end="\n",
75+
log=True,
76+
)

0 commit comments

Comments
 (0)