@@ -101,6 +101,11 @@ def __fs_objects(path: str) -> Set[FeatureSetPipeline]:
101
101
False , help = "To generate the logs in local file 'logging.json'."
102
102
)
103
103
104
+ DEBUG_MODE = typer .Option (
105
+ False ,
106
+ help = "To view the queries resulting from the migration, DON'T apply the migration." ,
107
+ )
108
+
104
109
105
110
class Migrate :
106
111
"""Execute migration operations in a Database based on pipeline Writer.
@@ -112,19 +117,27 @@ class Migrate:
112
117
def __init__ (self , pipelines : Set [FeatureSetPipeline ],) -> None :
113
118
self .pipelines = pipelines
114
119
115
- def _send_logs_to_s3 (self , file_local : bool ) -> None :
120
+ def _send_logs_to_s3 (self , file_local : bool , debug_mode : bool ) -> None :
116
121
"""Send all migration logs to S3."""
117
122
file_name = "../logging.json"
118
123
119
124
if not file_local and os .path .exists (file_name ):
120
125
s3_client = boto3 .client ("s3" )
121
126
122
127
timestamp = datetime .datetime .now ()
123
- object_name = (
124
- f"logs/migrate/"
125
- f"{ timestamp .strftime ('%Y-%m-%d' )} "
126
- f"/logging-{ timestamp .strftime ('%H:%M:%S' )} .json"
127
- )
128
+
129
+ if debug_mode :
130
+ object_name = (
131
+ f"logs/migrate-debug-mode/"
132
+ f"{ timestamp .strftime ('%Y-%m-%d' )} "
133
+ f"/logging-{ timestamp .strftime ('%H:%M:%S' )} .json"
134
+ )
135
+ else :
136
+ object_name = (
137
+ f"logs/migrate/"
138
+ f"{ timestamp .strftime ('%Y-%m-%d' )} "
139
+ f"/logging-{ timestamp .strftime ('%H:%M:%S' )} .json"
140
+ )
128
141
bucket = environment .get_variable ("FEATURE_STORE_S3_BUCKET" )
129
142
130
143
try :
@@ -143,23 +156,23 @@ def _send_logs_to_s3(self, file_local: bool) -> None:
143
156
json_data = json .load (json_f )
144
157
print (json_data )
145
158
146
- def run (self , generate_logs : bool = False ) -> None :
159
+ def run (self , generate_logs : bool = False , debug_mode : bool = False ) -> None :
147
160
"""Construct and apply the migrations."""
148
161
for pipeline in self .pipelines :
149
162
for writer in pipeline .sink .writers :
150
163
db = writer .db_config .database
151
164
if db == "cassandra" :
152
165
migration = ALLOWED_DATABASE [db ]
153
- migration .apply_migration (pipeline .feature_set , writer )
166
+ migration .apply_migration (pipeline .feature_set , writer , debug_mode )
154
167
else :
155
168
logger .warning (f"Butterfree not supporting { db } Migrations yet." )
156
169
157
- self ._send_logs_to_s3 (generate_logs )
170
+ self ._send_logs_to_s3 (generate_logs , debug_mode )
158
171
159
172
160
173
@app .command ("apply" )
161
174
def migrate (
162
- path : str = PATH , generate_logs : bool = GENERATE_LOGS ,
175
+ path : str = PATH , generate_logs : bool = GENERATE_LOGS , debug_mode : bool = DEBUG_MODE
163
176
) -> Set [FeatureSetPipeline ]:
164
177
"""Scan and run database migrations for feature set pipelines defined under PATH.
165
178
@@ -172,5 +185,5 @@ def migrate(
172
185
import and instantiate them.
173
186
"""
174
187
pipe_set = __fs_objects (path )
175
- Migrate (pipe_set ).run (generate_logs )
188
+ Migrate (pipe_set ).run (generate_logs , debug_mode )
176
189
return pipe_set
0 commit comments