Skip to content

Commit ed8f90a

Browse files
committed
python: python examples rewrite
Rewrote iio_readdev.py and iio_writedev.py in a more pythonic fashion. Signed-off-by: Cristi Iacob <[email protected]>
1 parent c7d589f commit ed8f90a

File tree

2 files changed

+355
-322
lines changed

2 files changed

+355
-322
lines changed

bindings/python/examples/iio_readdev.py

Lines changed: 173 additions & 150 deletions
Original file line numberDiff line numberDiff line change
@@ -17,173 +17,196 @@
1717
# along with this program; if not, write to the Free Software
1818
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
1919

20-
import iio
2120
import sys
22-
import signal
2321
import argparse
24-
25-
parser = argparse.ArgumentParser(description='iio_readdev')
26-
parser.add_argument('-n', '--network', type=str, metavar='',
27-
help='Use the network backend with the provided hostname.')
28-
parser.add_argument('-u', '--uri', type=str, metavar='',
29-
help='Use the context with the provided URI.')
30-
parser.add_argument('-b', '--buffer-size', type=int, metavar='',
31-
help='Size of the capture buffer. Default is 256.')
32-
parser.add_argument('-s', '--samples', type=int, metavar='',
33-
help='Number of samples to capture, 0 = infinite. Default is 0.')
34-
parser.add_argument('-T', '--timeout', type=int, metavar='',
35-
help='Buffer timeout in milliseconds. 0 = no timeout')
36-
parser.add_argument('-a', '--auto', action='store_true',
37-
help='Scan for available contexts and if only one is available use it.')
38-
parser.add_argument('device', type=str, nargs=1)
39-
parser.add_argument('channel', type=str, nargs='*')
40-
41-
arg_ip = ""
42-
arg_uri = ""
43-
scan_for_context = False
44-
buffer_size = 256
45-
num_samples = 0
46-
timeout = 0
47-
device_name = None
48-
channels = None
49-
50-
51-
def read_arguments():
52-
"""
53-
Method for reading the command line parameters and setting the corresponding variables.
54-
"""
55-
global arg_ip, arg_uri, scan_for_context, buffer_size, num_samples, timeout, device_name, channels
56-
57-
args = parser.parse_args()
58-
59-
if args.network is not None:
60-
arg_ip = str(args.network)
61-
62-
if args.uri is not None:
63-
arg_uri = str(args.uri)
64-
65-
if args.auto is True:
66-
scan_for_context = True
67-
68-
if args.buffer_size is not None:
69-
buffer_size = int(args.buffer_size)
70-
71-
if args.samples is not None:
72-
num_samples = int(args.samples)
73-
74-
if args.timeout is not None:
75-
timeout = int(args.timeout)
76-
77-
device_name = args.device[0]
78-
channels = args.channel
79-
80-
81-
def create_context(scan_for_context, arg_uri, arg_ip):
82-
"""
83-
Method for creating the corresponding context.
84-
85-
parameters:
86-
scan_for_context: type=bool
87-
Scan for available contexts and if only one is available use it.
88-
arg_uri: type=string
89-
The URI on which the program should look for a Context.
90-
arg_ip: type=string
91-
The IP on which the program should look for a Network Context.
92-
93-
returns: type:iio.Context
94-
The resulted context.
95-
"""
96-
ctx = None
97-
98-
try:
99-
if scan_for_context:
100-
contexts = iio.scan_contexts()
101-
if len(contexts) == 0:
102-
sys.stderr.write("No IIO context found.\n")
103-
exit(1)
104-
elif len(contexts) == 1:
105-
uri, _ = contexts.popitem()
106-
ctx = iio.Context(_context=uri)
107-
else:
108-
print("Multiple contexts found. Please select one using --uri!")
109-
110-
for uri, _ in contexts:
111-
print(uri)
112-
elif arg_uri != "":
113-
ctx = iio.Context(_context=arg_uri)
114-
elif arg_ip != "":
115-
ctx = iio.NetworkContext(arg_ip)
116-
else:
117-
ctx = iio.Context()
118-
except FileNotFoundError:
119-
sys.stderr.write('Unable to create IIO context\n')
120-
exit(1)
121-
122-
return ctx
22+
import iio
12323

12424

125-
def keyboard_interrupt_handler(signal, frame):
126-
sys.exit(0)
25+
class Arguments:
26+
"""Class for parsing the input arguments."""
27+
28+
def __init__(self):
29+
"""Arguments class constructor."""
30+
self.parser = argparse.ArgumentParser(description='iio_readdev')
31+
self._add_parser_arguments()
32+
args = self.parser.parse_args()
33+
34+
self.network = str(args.network) if args.network else None
35+
self.arg_uri = str(args.uri) if args.uri else None
36+
self.scan_for_context = args.auto
37+
self.buffer_size = int(args.buffer_size) if args.buffer_size else 256
38+
self.num_samples = int(args.samples) if args.samples else 0
39+
self.timeout = int(args.timeout) if args.timeout else 0
40+
self.device_name = args.device[0]
41+
self.channels = args.channel
42+
43+
def _add_parser_arguments(self):
44+
self.parser.add_argument(
45+
'-n', '--network', type=str, metavar='', help='Use the network backend with the provided hostname.')
46+
self.parser.add_argument(
47+
'-u', '--uri', type=str, metavar='', help='Use the context with the provided URI.')
48+
self.parser.add_argument(
49+
'-b', '--buffer-size', type=int, metavar='', help='Size of the capture buffer. Default is 256.')
50+
self.parser.add_argument(
51+
'-s', '--samples', type=int, metavar='', help='Number of samples to capture, 0 = infinite. Default is 0.')
52+
self.parser.add_argument(
53+
'-T', '--timeout', type=int, metavar='', help='Buffer timeout in milliseconds. 0 = no timeout')
54+
self.parser.add_argument(
55+
'-a', '--auto', action='store_true',
56+
help='Scan for available contexts and if only one is available use it.'
57+
)
58+
self.parser.add_argument('device', type=str, nargs=1)
59+
self.parser.add_argument('channel', type=str, nargs='*')
60+
61+
62+
class ContextBuilder:
63+
"""Class for creating the requested context."""
64+
65+
def __init__(self, arguments):
66+
"""
67+
ContextBuilder class constructor.
68+
Args:
69+
arguments: type=Arguments
70+
Contains the input arguments.
71+
"""
72+
self.ctx = None
73+
self.arguments = arguments
74+
75+
def _timeout(self):
76+
if self.arguments.timeout >= 0:
77+
self.ctx.timeout = self.arguments.timeout
78+
return self
79+
80+
def _auto(self):
81+
contexts = iio.scan_contexts()
82+
if len(contexts) == 0:
83+
raise Exception('No IIO context found.\n')
84+
if len(contexts) == 1:
85+
uri, _ = contexts.popitem()
86+
self.ctx = iio.Context(_context=uri)
87+
else:
88+
print('Multiple contexts found. Please select one using --uri!')
89+
for uri, _ in contexts.items():
90+
print(uri)
91+
sys.exit(0)
92+
93+
return self
94+
95+
def _uri(self):
96+
self.ctx = iio.Context(_context=self.arguments.arg_uri)
97+
return self
98+
99+
def _network(self):
100+
self.ctx = iio.NetworkContext(self.arguments.network)
101+
return self
102+
103+
def _default(self):
104+
self.ctx = iio.Context()
105+
return self
106+
107+
def create(self):
108+
"""Create the requested context."""
109+
try:
110+
if self.arguments.scan_for_context:
111+
self._auto()
112+
elif self.arguments.arg_uri:
113+
self._uri()
114+
elif self.arguments.arg_ip:
115+
self._network()
116+
else:
117+
self._default()
118+
except FileNotFoundError:
119+
raise Exception('Unable to create IIO context!\n')
127120

121+
self._timeout()
128122

129-
signal.signal(signal.SIGINT, keyboard_interrupt_handler)
123+
return self.ctx
130124

131125

132-
def read_data(buffer, num_samples):
133-
"""
134-
Method for reading data from the buffer.
126+
class BufferBuilder:
127+
"""Class for creating the buffer."""
135128

136-
parameters:
137-
buffer: type=iio.Buffer
138-
Current buffer.
139-
num_samples: type=int
140-
Number of samples to capture, 0 = infinite. Default is 0.
129+
def __init__(self, ctx, arguments):
130+
"""
131+
BufferBuilder class constructor.
132+
Args:
133+
ctx: type=iio.Context
134+
This buffer's context.
135+
arguments: type=Arguments
136+
Contains the input arguments.
137+
"""
138+
self.ctx = ctx
139+
self.arguments = arguments
140+
self.dev = None
141141

142-
returns: type=None
143-
Reads data from buffer.
144-
"""
145-
if buffer is None:
146-
sys.stderr.write('Unable to create buffer!\n')
147-
exit(1)
142+
def _device(self):
143+
self.dev = self.ctx.find_device(self.arguments.device_name)
148144

149-
while True:
150-
buffer.refill()
151-
samples = buffer.read()
145+
if self.dev is None:
146+
raise Exception('Device %s not found!' % self.arguments.device_name)
152147

153-
if num_samples > 0:
154-
sys.stdout.buffer.write(samples[:min(num_samples, len(samples))])
155-
num_samples -= min(num_samples, len(samples))
148+
return self
156149

157-
if num_samples == 0:
158-
break
150+
def _channels(self):
151+
if len(self.arguments.channels) == 0:
152+
for channel in self.dev.channels:
153+
channel.enabled = True
159154
else:
160-
sys.stdout.buffer.write(bytes(samples))
155+
for channel_idx in self.arguments.channels:
156+
self.dev.channels[int(channel_idx)].enabled = True
157+
158+
return self
159+
160+
def create(self):
161+
"""Create the IIO buffer."""
162+
self._device()
163+
self._channels()
164+
buffer = iio.Buffer(self.dev, self.arguments.buffer_size)
165+
166+
if buffer is None:
167+
raise Exception('Unable to create buffer!\n')
168+
169+
return buffer
170+
171+
172+
class DataReader:
173+
"""Class for reading samples from the device."""
174+
175+
def __init__(self, ctx, arguments):
176+
"""
177+
DataWriter class constructor.
178+
Args:
179+
ctx: type=iio.Context
180+
Current context.
181+
arguments: type=Arguments
182+
Contains the input arguments.
183+
"""
184+
buffer_builder = BufferBuilder(ctx, arguments)
185+
self.buffer = buffer_builder.create()
186+
self.arguments = arguments
187+
188+
def read(self):
189+
"""Read data from the buffer."""
190+
while True:
191+
self.buffer.refill()
192+
samples = self.buffer.read()
193+
194+
if self.arguments.num_samples > 0:
195+
sys.stdout.buffer.write(samples[:min(self.arguments.num_samples, len(samples))])
196+
self.arguments.num_samples -= min(self.arguments.num_samples, len(samples))
197+
198+
if self.arguments.num_samples == 0:
199+
break
200+
else:
201+
sys.stdout.buffer.write(bytes(samples))
161202

162203

163204
def main():
164-
read_arguments()
165-
166-
ctx = create_context(scan_for_context, arg_uri, arg_ip)
167-
168-
if timeout >= 0:
169-
ctx.set_timeout(timeout)
170-
171-
dev = ctx.find_device(device_name)
172-
173-
if dev is None:
174-
sys.stderr.write('Device %s not found!\n' % device_name)
175-
exit(1)
176-
177-
if len(channels) == 0:
178-
for channel in dev.channels:
179-
channel.enabled = True
180-
else:
181-
for channel_idx in channels:
182-
dev.channels[int(channel_idx)].enabled = True
183-
184-
buffer = iio.Buffer(dev, buffer_size)
185-
186-
read_data(buffer, num_samples)
205+
"""Module's main method."""
206+
arguments = Arguments()
207+
context_builder = ContextBuilder(arguments)
208+
reader = DataReader(context_builder.create(), arguments)
209+
reader.read()
187210

188211

189212
if __name__ == '__main__':

0 commit comments

Comments
 (0)