Skip to content

Commit 4e37058

Browse files
committed
Add a timeout to the os.read function
1 parent ef2e3d1 commit 4e37058

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

acquire/volatilestream.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import os
2+
from concurrent import futures
23
from io import SEEK_SET, UnsupportedOperation
34
from pathlib import Path
45
from stat import S_IRGRP, S_IROTH, S_IRUSR
6+
from typing import Any, Callable
57

68
from dissect.util.stream import AlignedStream
79

@@ -14,6 +16,35 @@
1416
HAS_FCNTL = False
1517

1618

19+
def timeout(func: Callable, *, timelimit: int) -> Callable:
20+
"""Timeout a function if it takes too long to complete.
21+
22+
Args:
23+
func: a function to wrap.
24+
timelimit: The time in seconds that an operation is allowed to run.
25+
26+
Raises:
27+
TimeoutError: If its time exceeds the timelimit
28+
"""
29+
30+
def wrapper(*args: Any, **kwargs: Any) -> Any:
31+
with futures.ThreadPoolExecutor(max_workers=1) as executor:
32+
future = executor.submit(func, *args, **kwargs)
33+
34+
try:
35+
result = future.result(timelimit)
36+
except futures.TimeoutError:
37+
raise TimeoutError
38+
finally:
39+
# Make sure the thread stops right away.
40+
executor._threads.clear()
41+
futures.thread._threads_queues.clear()
42+
43+
return result
44+
45+
return wrapper
46+
47+
1748
class VolatileStream(AlignedStream):
1849
"""Streaming class to handle various procfs and sysfs edge-cases. Backed by `AlignedStream`.
1950
@@ -41,6 +72,8 @@ def __init__(
4172
st_mode = os.fstat(self.fd).st_mode
4273
write_only = (st_mode & (S_IRUSR | S_IRGRP | S_IROTH)) == 0 # novermin
4374

75+
self._os_read = timeout(os.read, timelimit=5)
76+
4477
super().__init__(0 if write_only else size)
4578

4679
def seek(self, pos: int, whence: int = SEEK_SET) -> int:
@@ -53,8 +86,8 @@ def _read(self, offset: int, length: int) -> bytes:
5386
result = []
5487
while length:
5588
try:
56-
buf = os.read(self.fd, min(length, self.size - offset))
57-
except BlockingIOError:
89+
buf = self._os_read(self.fd, min(length, self.size - offset))
90+
except (BlockingIOError, TimeoutError):
5891
break
5992

6093
if not buf:

tests/test_volatile.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from time import sleep, time
2+
3+
import pytest
4+
5+
from acquire.volatilestream import timeout
6+
7+
8+
def test_timeout():
9+
def snooze():
10+
sleep(10)
11+
12+
function = timeout(snooze, timelimit=5)
13+
start = time()
14+
15+
with pytest.raises(TimeoutError):
16+
function()
17+
18+
end = time()
19+
20+
assert end - start < 6

0 commit comments

Comments
 (0)