|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import numpy as np |
| 4 | +from cvias.image.detection.vllm_detection import VLLMDetection |
| 5 | + |
| 6 | +from ns_vfs.automaton.video_automaton import VideoAutomaton |
| 7 | +from ns_vfs.data.frame import FramesofInterest, VideoFrame |
| 8 | +from ns_vfs.model_checking.stormpy import StormModelChecker |
| 9 | +from ns_vfs.percepter.single_vision_percepter import SingleVisionPercepter |
| 10 | +from ns_vfs.validator import FrameValidator |
| 11 | + |
| 12 | + |
| 13 | +def run_nsvs_nsvqa( |
| 14 | + nsvqa_input_data: list[dict[str, list[np.ndarray] | None]], |
| 15 | + proposition_set: list[str], |
| 16 | + ltl_formula: str, |
| 17 | + output_path: str, |
| 18 | + api_key="EMPTY", |
| 19 | + api_base="http://localhost:8000/v1", |
| 20 | + model="OpenGVLab/InternVL2_5-8B", |
| 21 | + threshold_satisfaction_probability: float = 0.80, |
| 22 | + frame_scale: int | None = None, |
| 23 | + calibration_method: str = "temperature_scaling", |
| 24 | + desired_interval_in_sec: float | None = None, |
| 25 | + desired_fps: int | None = None, |
| 26 | + custom_prompt: str | None = None, |
| 27 | +) -> None: |
| 28 | + # Yolo model initialization |
| 29 | + vllm_model = VLLMDetection( |
| 30 | + api_key=api_key, |
| 31 | + api_base=api_base, |
| 32 | + model=model, |
| 33 | + calibration_method=calibration_method, |
| 34 | + ) |
| 35 | + # Video automaton initialization |
| 36 | + ltl_formula = f"P>={threshold_satisfaction_probability} [{ltl_formula}]" |
| 37 | + automaton = VideoAutomaton() |
| 38 | + automaton.set_up(proposition_set=proposition_set) |
| 39 | + # Model checker initialization |
| 40 | + model_checker = StormModelChecker( |
| 41 | + proposition_set=proposition_set, ltl_formula=ltl_formula |
| 42 | + ) |
| 43 | + # Frame of interest initialization |
| 44 | + frame_of_interest = FramesofInterest(ltl_formula=ltl_formula) |
| 45 | + # Video processor initialization |
| 46 | + |
| 47 | + # Vision percepter initialization |
| 48 | + vision_percepter = SingleVisionPercepter( |
| 49 | + cv_models=vllm_model, |
| 50 | + ) |
| 51 | + |
| 52 | + frame_validator = FrameValidator(ltl_formula=ltl_formula) |
| 53 | + frame_idx = 0 |
| 54 | + model_checker_is_filter: bool = (False,) |
| 55 | + model_checker_type: str = ("sparse_ma",) |
| 56 | + for nsvqa_input in nsvqa_input_data: |
| 57 | + sequence_of_frames = nsvqa_input["frames"] |
| 58 | + detected_objects: dict = vision_percepter.perceive( |
| 59 | + image=sequence_of_frames, |
| 60 | + object_of_interest=proposition_set, |
| 61 | + extra_description_of_object=nsvqa_input["subtitle"], |
| 62 | + ) |
| 63 | + activity_of_interest = None |
| 64 | + |
| 65 | + frame = VideoFrame( |
| 66 | + frame_idx=frame_idx, |
| 67 | + timestamp=frame_idx, |
| 68 | + frame_image=sequence_of_frames, |
| 69 | + object_of_interest=detected_objects, |
| 70 | + activity_of_interest=activity_of_interest, |
| 71 | + ) |
| 72 | + frame_idx += 1 |
| 73 | + |
| 74 | + # 1. frame validation |
| 75 | + if frame_validator.validate_frame(frame=frame): |
| 76 | + # 2. dynamic automaton construction |
| 77 | + automaton.add_frame(frame=frame) |
| 78 | + frame_of_interest.frame_buffer.append(frame) |
| 79 | + # 3. model checking |
| 80 | + model_checking_result = model_checker.check_automaton( |
| 81 | + transitions=automaton.transitions, |
| 82 | + states=automaton.states, |
| 83 | + model_type=model_checker_type, |
| 84 | + use_filter=model_checker_is_filter, |
| 85 | + ) |
| 86 | + if model_checking_result: |
| 87 | + # specification satisfied |
| 88 | + frame_of_interest.flush_frame_buffer() |
| 89 | + automaton.reset() |
| 90 | + |
| 91 | + print("--------------------------------") |
| 92 | + print("Detected frames of interest:") |
| 93 | + print(frame_of_interest.foi_list) |
| 94 | + # save result |
| 95 | + if output_path: |
| 96 | + frame_of_interest.save(path=output_path) |
| 97 | + print(f"\nResults saved in {output_path}") |
| 98 | + |
| 99 | + return frame_of_interest.foi_list |
| 100 | + |
| 101 | + |
| 102 | +if __name__ == "__main__": |
| 103 | + sample_data = [ |
| 104 | + { |
| 105 | + "frames": [ |
| 106 | + # Create a 224x224x3 RGB image with some pattern |
| 107 | + np.random.randint( |
| 108 | + 0, 255, (224, 224, 3), dtype=np.uint8 |
| 109 | + ), # Random RGB image |
| 110 | + np.random.randint( |
| 111 | + 0, 255, (224, 224, 3), dtype=np.uint8 |
| 112 | + ), # Random RGB image |
| 113 | + ], |
| 114 | + "subtitle": "test", |
| 115 | + }, |
| 116 | + { |
| 117 | + "frames": [ |
| 118 | + np.random.randint( |
| 119 | + 0, 255, (224, 224, 3), dtype=np.uint8 |
| 120 | + ), # Random RGB image |
| 121 | + ], |
| 122 | + "subtitle": "test", |
| 123 | + }, |
| 124 | + ] |
| 125 | + run_nsvs_nsvqa( |
| 126 | + nsvqa_input_data=sample_data, |
| 127 | + desired_interval_in_sec=None, |
| 128 | + desired_fps=30, |
| 129 | + proposition_set=["car", "truck"], |
| 130 | + ltl_formula='"car" U "truck"', |
| 131 | + output_path="/home/mc76728/repo/Coargus/Neuro-Symbolic-Video-Search-Temporal-Logic/_dev_", |
| 132 | + threshold_satisfaction_probability=0.80, |
| 133 | + frame_scale=None, |
| 134 | + calibration_method="temperature_scaling", |
| 135 | + ) |
0 commit comments