4
4
from typing import Any , Dict , List , Sequence , Tuple , Union
5
5
6
6
from cvise .passes .abstract import AbstractPass , BinaryState , PassResult , ProcessEventNotifier
7
- from cvise .utils .hint import apply_hints , group_hints_by_type , HintBundle , HintApplicationStats , load_hints , store_hints
7
+ from cvise .utils .hint import (
8
+ apply_hints ,
9
+ group_hints_by_type ,
10
+ is_special_hint_type ,
11
+ HintBundle ,
12
+ HintApplicationStats ,
13
+ load_hints ,
14
+ store_hints ,
15
+ )
8
16
9
17
HINTS_FILE_NAME_TEMPLATE = 'hints{type}.jsonl.zst'
10
18
@@ -46,6 +54,19 @@ def advance_on_success(self, new_hint_count: int) -> Union[PerTypeHintState, Non
46
54
)
47
55
48
56
57
+ @dataclass (frozen = True )
58
+ class SpecialHintState :
59
+ """A sub-item of HintState for "special" hint types - those that start from "@".
60
+
61
+ Such hints aren't attempted as reduction attempts themselves, instead they convey information from one pass to
62
+ another - hence there's no underlying_state here.
63
+ """
64
+
65
+ type : str
66
+ hints_file_name : Path
67
+ hint_count : int
68
+
69
+
49
70
@dataclass (frozen = True )
50
71
class HintState :
51
72
"""Stores the current state of the HintBasedPass.
@@ -57,21 +78,29 @@ class HintState:
57
78
tmp_dir : Path
58
79
# The enumeration state for each hint type. Sorted by type (in order to have deterministic and repeatable
59
80
# enumeration order).
60
- per_type_states : Tuple [PerTypeHintState ]
81
+ per_type_states : Tuple [PerTypeHintState , ... ]
61
82
# Pointer to the current per-type state in the round-robin enumeration.
62
83
ptr : int
84
+ # Information for "special" hint types (those that start with "@"). They're stored separately because we don't
85
+ # attempt applying them during enumeration - they're only intended as inputs for other passes that depend on them.
86
+ special_hints : Tuple [SpecialHintState , ...]
63
87
64
88
@staticmethod
65
- def create (tmp_dir : Path , per_type_states : List [PerTypeHintState ]):
89
+ def create (tmp_dir : Path , per_type_states : List [PerTypeHintState ], special_hints : List [ SpecialHintState ] ):
66
90
sorted_states = sorted (per_type_states , key = lambda s : s .type )
67
- return HintState (tmp_dir = tmp_dir , per_type_states = tuple (sorted_states ), ptr = 0 )
91
+ sorted_special_hints = sorted (special_hints , key = lambda s : s .type )
92
+ return HintState (
93
+ tmp_dir = tmp_dir , per_type_states = tuple (sorted_states ), ptr = 0 , special_hints = tuple (sorted_special_hints )
94
+ )
68
95
69
96
def __repr__ (self ):
70
97
parts = []
71
98
for i , s in enumerate (self .per_type_states ):
72
99
mark = '[*]' if i == self .ptr and len (self .per_type_states ) > 1 else ''
73
100
type_s = s .type + ': ' if s .type else ''
74
101
parts .append (f'{ mark } { type_s } { s .underlying_state .compact_repr ()} ' )
102
+ for s in self .special_hints :
103
+ parts .append (f'{ s .type } : { s .hint_count } ' )
75
104
return f'HintState({ ", " .join (parts )} )'
76
105
77
106
def real_chunk (self ) -> int :
@@ -98,7 +127,12 @@ def advance(self) -> Union[HintState, None]:
98
127
new_ptr += 1
99
128
new_ptr %= len (new_per_type_states )
100
129
101
- return HintState (tmp_dir = self .tmp_dir , per_type_states = tuple (new_per_type_states ), ptr = new_ptr )
130
+ return HintState (
131
+ tmp_dir = self .tmp_dir ,
132
+ per_type_states = tuple (new_per_type_states ),
133
+ ptr = new_ptr ,
134
+ special_hints = self .special_hints ,
135
+ )
102
136
103
137
def advance_on_success (self , type_to_bundle : Dict [str , HintBundle ]):
104
138
sub_states = []
@@ -114,10 +148,15 @@ def advance_on_success(self, type_to_bundle: Dict[str, HintBundle]):
114
148
sub_states .append (new_substate )
115
149
if not sub_states :
116
150
return None
117
- return HintState (tmp_dir = self .tmp_dir , per_type_states = tuple (sub_states ), ptr = 0 )
151
+ return HintState (
152
+ tmp_dir = self .tmp_dir , per_type_states = tuple (sub_states ), ptr = 0 , special_hints = self .special_hints
153
+ )
118
154
119
155
def hint_bundle_paths (self ) -> Dict [str , Path ]:
120
- return {substate .type : self .tmp_dir / substate .hints_file_name for substate in self .per_type_states }
156
+ return {
157
+ substate .type : self .tmp_dir / substate .hints_file_name
158
+ for substate in self .per_type_states + self .special_hints
159
+ }
121
160
122
161
123
162
class HintBasedPass (AbstractPass ):
@@ -186,6 +225,8 @@ def new(
186
225
return self .new_from_hints (hints , tmp_dir )
187
226
188
227
def transform (self , test_case : Path , state : HintState , original_test_case : Path , * args , ** kwargs ):
228
+ if not state .per_type_states : # possible if all hints produced by new() were "special"
229
+ return PassResult .STOP , state
189
230
self .load_and_apply_hints (original_test_case , test_case , [state ])
190
231
return PassResult .OK , state
191
232
@@ -230,18 +271,27 @@ def new_from_hints(self, bundle: HintBundle, tmp_dir: Path) -> Union[HintState,
230
271
type_to_bundle = group_hints_by_type (bundle )
231
272
self .backfill_pass_names (type_to_bundle )
232
273
type_to_file_name = store_hints_per_type (tmp_dir , type_to_bundle )
233
- sub_states = []
234
- # Initialize a separate enumeration for each group of hints sharing a particular type.
274
+ sub_states : List [ PerTypeHintState ] = []
275
+ special_states : List [ SpecialHintState ] = []
235
276
for type , sub_bundle in type_to_bundle .items ():
236
- underlying = self .create_elementary_state (len (sub_bundle .hints ))
237
- if underlying is None :
238
- continue
239
- sub_states .append (
240
- PerTypeHintState (type = type , hints_file_name = type_to_file_name [type ], underlying_state = underlying )
241
- )
242
- if not sub_states :
277
+ if is_special_hint_type (type ):
278
+ # "Special" hints aren't attempted in transform() jobs - only store them to be consumed by other passes.
279
+ special_states .append (
280
+ SpecialHintState (
281
+ type = type , hints_file_name = type_to_file_name [type ], hint_count = len (sub_bundle .hints )
282
+ )
283
+ )
284
+ else :
285
+ # Initialize a separate enumeration for this group of hints sharing a particular type.
286
+ underlying = self .create_elementary_state (len (sub_bundle .hints ))
287
+ if underlying is None :
288
+ continue
289
+ sub_states .append (
290
+ PerTypeHintState (type = type , hints_file_name = type_to_file_name [type ], underlying_state = underlying )
291
+ )
292
+ if not sub_states and not special_states :
243
293
return None
244
- return HintState .create (tmp_dir , sub_states )
294
+ return HintState .create (tmp_dir , sub_states , special_states )
245
295
246
296
def advance_on_success_from_hints (self , bundle : HintBundle , state : HintState ) -> Union [HintState , None ]:
247
297
"""Advances the state after a successful reduction, given pre-generated hints.
0 commit comments