@@ -17,71 +17,8 @@ def check_version(version: str):
17
17
raise AssertionError (f"Version does not match regex pattern '{ pattern } '" )
18
18
19
19
20
- @keyword ("Run Suggestion And Verify Output" )
21
- def test_command_output (command ) -> bool :
22
- """
23
- Executes a command that produces output and then waits for user input
24
- (i.e. it hangs until input is provided). The function waits a few seconds
25
- for the command to produce its initial output, checks that the expected
26
- output is present, and asserts accordingly.
27
-
28
- Note:
29
- This function does NOT send any input to finish the process.
30
- After the assertion, the process will continue to wait for input.
31
-
32
- Parameters:
33
- command (str): The shell command to run.
34
- expected_output (str): A substring expected to be present in the output.
35
-
36
- Returns:
37
- proc (subprocess.Popen): The process object, still running and waiting for input.
38
- """
39
- # Start the process.
40
- proc = subprocess .Popen (
41
- command ,
42
- shell = True ,
43
- stdout = subprocess .PIPE ,
44
- stderr = subprocess .PIPE ,
45
- stdin = subprocess .PIPE ,
46
- text = True , # Work with strings rather than bytes.
47
- bufsize = 1 , # Use line-buffering.
48
- )
49
-
50
- # A thread-safe queue to hold the output lines.
51
- output_queue = queue .Queue ()
52
-
53
- def enqueue_output (out , queue_obj ):
54
- """
55
- Reads lines from the process's stdout and places them into a queue.
56
- """
57
- for line in iter (out .readline , "" ):
58
- queue_obj .put (line )
59
- out .close ()
60
-
61
- # Start a daemon thread to continuously read from proc.stdout.
62
- t = threading .Thread (target = enqueue_output , args = (proc .stdout , output_queue ))
63
- t .daemon = True # Ensures the thread won't block program exit.
64
- t .start ()
65
-
66
- # Wait 2-3 seconds for the process to generate some output.
67
- time .sleep (3 )
68
-
69
- # Drain the queue to collect the output produced so far.
70
- output_lines = []
71
- while True :
72
- try :
73
- line = output_queue .get_nowait ()
74
- except queue .Empty :
75
- break
76
- else :
77
- output_lines .append (line )
78
-
79
- partial_output = "" .join (output_lines )
80
- return partial_output
81
-
82
-
83
20
@keyword ("Run Hanging Command And Verify Output" )
84
- def test_hanging_command_output (command ) -> bool :
21
+ def test_hanging_command_output (command ) -> str :
85
22
"""
86
23
Executes a command that produces output and then waits for user input
87
24
(i.e. it hangs until input is provided). The function waits a few seconds
0 commit comments