16
16
import numpy as np
17
17
import sanic .response as resp
18
18
from forager_embedding_server .ais import ais_singleiter , get_fscore
19
+ from forager_embedding_server .bgsplit_jobs import (BGSplitInferenceJob ,
20
+ BGSplitTrainingJob , Trainer )
19
21
from forager_embedding_server .config import CONFIG
20
22
from forager_embedding_server .embedding_jobs import EmbeddingInferenceJob
21
- from forager_embedding_server .jobs_data import (
22
- ImageList ,
23
- QueryResult ,
24
- load_embedding_set ,
25
- load_score_set ,
26
- )
23
+ from forager_embedding_server .jobs_data import (ImageList , QueryResult ,
24
+ load_embedding_set ,
25
+ load_score_set )
27
26
from forager_embedding_server .utils import CleanupDict
28
27
from forager_knn import utils
28
+ from forager_knn .clusters import TerraformModule
29
29
from PIL import Image
30
30
from sanic import Sanic
31
31
from sklearn import svm
32
32
from sklearn .metrics import precision_score , recall_score
33
33
34
- BUILD_WITH_KUBE = False
35
-
36
- if BUILD_WITH_KUBE :
37
- from forager_knn .clusters import TerraformModule
38
-
39
- from forager_embedding_server .bgsplit_jobs import (
40
- BGSplitInferenceJob ,
41
- BGSplitTrainingJob ,
42
- Trainer ,
43
- )
44
-
45
34
# Create a logger for the server
46
35
47
36
forager_embedding_server .log .init_logging ()
@@ -179,19 +168,13 @@ async def _stop_cluster(cluster):
179
168
180
169
181
170
# TODO(mihirg): Automatically clean up inactive clusters
182
- if BUILD_WITH_KUBE :
183
- current_clusters : CleanupDict [str , TerraformModule ] = CleanupDict (
184
- _stop_cluster , app .add_task , CONFIG .CLUSTER .CLEANUP_TIME
185
- )
186
- else :
187
- current_clusters = {}
171
+ current_clusters : CleanupDict [str , TerraformModule ] = CleanupDict (
172
+ _stop_cluster , app .add_task , CONFIG .CLUSTER .CLEANUP_TIME
173
+ )
188
174
189
175
190
176
@app .route ("/start_cluster" , methods = ["POST" ])
191
177
async def start_cluster (request ):
192
- if not BUILD_WITH_KUBE :
193
- return resp .json ({"success" : False }, status = 400 )
194
-
195
178
cluster = TerraformModule (
196
179
CONFIG .CLUSTER .TERRAFORM_MODULE_PATH , copy = not CONFIG .CLUSTER .REUSE_EXISTING
197
180
)
@@ -203,14 +186,32 @@ async def start_cluster(request):
203
186
204
187
@app .route ("/cluster_status" , methods = ["GET" ])
205
188
async def cluster_status (request ):
206
- cluster_id = request .args ["cluster_id" ][0 ]
207
- cluster = current_clusters .get (cluster_id )
208
- has_cluster = cluster is not None
189
+ if "cluster_id" in request .args :
190
+ cluster_id = request .args ["cluster_id" ][0 ]
191
+ cluster = current_clusters .get (cluster_id )
192
+ has_cluster = cluster is not None
193
+ status = {
194
+ "has_cluster" : has_cluster ,
195
+ "ready" : has_cluster and cluster .ready .is_set (),
196
+ }
197
+ else :
198
+ status = {"clusters" : []}
199
+ for cluster_id , cluster in current_clusters .items ():
200
+ state = "creating"
201
+ if cluster .ready .is_set ():
202
+ state = "ready"
203
+ if cluster .destroying .is_set ():
204
+ state = "destroying"
205
+ status ["clusters" ].append (
206
+ {
207
+ "id" : cluster_id ,
208
+ "name" : "tbd" ,
209
+ "created_at" : "N/A" ,
210
+ "created_by" : "N/A" ,
211
+ "status" : state ,
212
+ }
213
+ )
209
214
210
- status = {
211
- "has_cluster" : has_cluster ,
212
- "ready" : has_cluster and cluster .ready .is_set (),
213
- }
214
215
return resp .json (status )
215
216
216
217
@@ -226,12 +227,9 @@ async def stop_cluster(request):
226
227
# Note(mihirg): These functions are out of date as of 6/25 and need to be fixed/removed.
227
228
#
228
229
229
- if BUILD_WITH_KUBE :
230
- current_models : CleanupDict [str , BGSplitTrainingJob ] = CleanupDict (
231
- lambda job : job .stop ()
232
- )
233
- else :
234
- current_models = {}
230
+ current_models : CleanupDict [str , BGSplitTrainingJob ] = CleanupDict (
231
+ lambda job : job .stop ()
232
+ )
235
233
236
234
237
235
@app .route ("/start_bgsplit_job" , methods = ["POST" ])
@@ -386,12 +384,9 @@ async def bgsplit_job_status(request):
386
384
return resp .json (status )
387
385
388
386
389
- if BUILD_WITH_KUBE :
390
- current_model_inference_jobs : CleanupDict [str , BGSplitInferenceJob ] = CleanupDict (
391
- lambda job : job .stop ()
392
- )
393
- else :
394
- current_model_inference_jobs = {}
387
+ current_model_inference_jobs : CleanupDict [str , BGSplitInferenceJob ] = CleanupDict (
388
+ lambda job : job .stop ()
389
+ )
395
390
396
391
397
392
@app .route ("/start_bgsplit_inference_job" , methods = ["POST" ])
@@ -748,10 +743,9 @@ async def cleanup(app, loop):
748
743
749
744
@utils .log_exception_from_coro_but_return_none
750
745
async def _cleanup_clusters ():
751
- if BUILD_WITH_KUBE :
752
- n = len (current_clusters )
753
- await current_clusters .clear_async ()
754
- print (f"- killed { n } clusters" )
746
+ n = len (current_clusters )
747
+ await current_clusters .clear_async ()
748
+ print (f"- killed { n } clusters" )
755
749
756
750
757
751
if __name__ == "__main__" :
0 commit comments