@@ -39,11 +39,13 @@ package main
39
39
40
40
import (
41
41
"context"
42
+ "encoding/json"
42
43
"flag"
43
44
"fmt"
44
45
"io/ioutil"
45
46
"os"
46
47
"path/filepath"
48
+ "regexp"
47
49
"strconv"
48
50
"strings"
49
51
"time"
@@ -102,6 +104,7 @@ var (
102
104
earlyStopServiceAddr = flag .String ("s-earlystop" , "" , "Katib Early Stopping service endpoint" )
103
105
trialName = flag .String ("t" , "" , "Trial Name" )
104
106
metricsFilePath = flag .String ("path" , "" , "Metrics File Path" )
107
+ metricsFileFormat = flag .String ("format" , "" , "Metrics File Format" )
105
108
metricNames = flag .String ("m" , "" , "Metric names" )
106
109
objectiveType = flag .String ("o-type" , "" , "Objective type" )
107
110
metricFilters = flag .String ("f" , "" , "Metric filters" )
@@ -170,7 +173,10 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
170
173
}
171
174
172
175
// Get list of regural expressions from filters.
173
- metricRegList := filemc .GetFilterRegexpList (filters )
176
+ var metricRegList []* regexp.Regexp
177
+ if * metricsFileFormat == commonv1beta1 .TextFormat .String () {
178
+ metricRegList = filemc .GetFilterRegexpList (filters )
179
+ }
174
180
175
181
// Start watch log lines.
176
182
t , _ := tail .TailFile (mFile , tail.Config {Follow : true })
@@ -179,78 +185,78 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
179
185
// Print log line
180
186
klog .Info (logText )
181
187
182
- // Check if log line contains metric from stop rules.
183
- isRuleLine := false
184
- for _ , rule := range stopRules {
185
- if strings .Contains (logText , rule .Name ) {
186
- isRuleLine = true
187
- break
188
- }
189
- }
190
- // If log line doesn't contain appropriate metric, continue track file.
191
- if ! isRuleLine {
192
- continue
193
- }
194
-
195
- // If log line contains appropriate metric, find all submatches from metric filters.
196
- for _ , metricReg := range metricRegList {
197
- matchStrings := metricReg .FindAllStringSubmatch (logText , - 1 )
198
- for _ , subMatchList := range matchStrings {
199
- if len (subMatchList ) < 3 {
200
- continue
201
- }
202
- // Submatch must have metric name and float value
203
- metricName := strings .TrimSpace (subMatchList [1 ])
204
- metricValue , err := strconv .ParseFloat (strings .TrimSpace (subMatchList [2 ]), 64 )
205
- if err != nil {
206
- klog .Fatalf ("Unable to parse value %v to float for metric %v" , metricValue , metricName )
188
+ switch * metricsFileFormat {
189
+ case commonv1beta1 .TextFormat .String ():
190
+ // Check if log line contains metric from stop rules.
191
+ isRuleLine := false
192
+ for _ , rule := range stopRules {
193
+ if strings .Contains (logText , rule .Name ) {
194
+ isRuleLine = true
195
+ break
207
196
}
197
+ }
198
+ // If log line doesn't contain appropriate metric, continue track file.
199
+ if ! isRuleLine {
200
+ continue
201
+ }
208
202
209
- // stopRules contains array of EarlyStoppingRules that has not been reached yet.
210
- // After rule is reached we delete appropriate element from the array.
211
- for idx , rule := range stopRules {
212
- if metricName != rule .Name {
203
+ // If log line contains appropriate metric, find all submatches from metric filters.
204
+ for _ , metricReg := range metricRegList {
205
+ matchStrings := metricReg .FindAllStringSubmatch (logText , - 1 )
206
+ for _ , subMatchList := range matchStrings {
207
+ if len (subMatchList ) < 3 {
213
208
continue
214
209
}
215
-
216
- // Calculate optimalObjValue.
217
- if metricName == objMetric {
218
- if optimalObjValue == nil {
219
- optimalObjValue = & metricValue
220
- } else if objType == commonv1beta1 .ObjectiveTypeMaximize && metricValue > * optimalObjValue {
221
- optimalObjValue = & metricValue
222
- } else if objType == commonv1beta1 .ObjectiveTypeMinimize && metricValue < * optimalObjValue {
223
- optimalObjValue = & metricValue
224
- }
225
- // Assign best optimal value to metric value.
226
- metricValue = * optimalObjValue
210
+ // Submatch must have metric name and float value
211
+ metricName := strings .TrimSpace (subMatchList [1 ])
212
+ metricValue , err := strconv .ParseFloat (strings .TrimSpace (subMatchList [2 ]), 64 )
213
+ if err != nil {
214
+ klog .Fatalf ("Unable to parse value %v to float for metric %v" , metricValue , metricName )
227
215
}
228
216
229
- // Reduce steps if appropriate metric is reported.
230
- // Once rest steps are empty we apply early stopping rule.
231
- if _ , ok := metricStartStep [metricName ]; ok {
232
- metricStartStep [metricName ]--
233
- if metricStartStep [metricName ] != 0 {
217
+ // stopRules contains array of EarlyStoppingRules that has not been reached yet.
218
+ // After rule is reached we delete appropriate element from the array.
219
+ for idx , rule := range stopRules {
220
+ if metricName != rule .Name {
234
221
continue
235
222
}
223
+ stopRules = updateStopRules (objMetric , stopRules , optimalObjValue , metricValue , objType , metricStartStep , rule , idx )
236
224
}
225
+ }
226
+ }
227
+ case commonv1beta1 .JsonFormat .String ():
228
+ var logJsonObj map [string ]interface {}
229
+ if err = json .Unmarshal ([]byte (logText ), & logJsonObj ); err != nil {
230
+ klog .Fatalf ("Failed to unmarshal logs in JSON format, log: %s, error: %v" , logText , err )
231
+ }
232
+ // Check if log line contains metric from stop rules.
233
+ isRuleLine := false
234
+ for _ , rule := range stopRules {
235
+ if _ , exist := logJsonObj [rule .Name ]; exist {
236
+ isRuleLine = true
237
+ break
238
+ }
239
+ }
240
+ // If log line doesn't contain appropriate metric, continue track file.
241
+ if ! isRuleLine {
242
+ continue
243
+ }
237
244
238
- ruleValue , err := strconv .ParseFloat (rule .Value , 64 )
239
- if err != nil {
240
- klog .Fatalf ("Unable to parse value %v to float for rule metric %v" , rule .Value , rule .Name )
241
- }
242
-
243
- // Metric value can be equal, less or greater than stop rule.
244
- // Deleting suitable stop rule from the array.
245
- if rule .Comparison == commonv1beta1 .ComparisonTypeEqual && metricValue == ruleValue {
246
- stopRules = deleteStopRule (stopRules , idx )
247
- } else if rule .Comparison == commonv1beta1 .ComparisonTypeLess && metricValue < ruleValue {
248
- stopRules = deleteStopRule (stopRules , idx )
249
- } else if rule .Comparison == commonv1beta1 .ComparisonTypeGreater && metricValue > ruleValue {
250
- stopRules = deleteStopRule (stopRules , idx )
251
- }
245
+ // stopRules contains array of EarlyStoppingRules that has not been reached yet.
246
+ // After rule is reached we delete appropriate element from the array.
247
+ for idx , rule := range stopRules {
248
+ value , exist := logJsonObj [rule .Name ].(string )
249
+ if ! exist {
250
+ continue
252
251
}
252
+ metricValue , err := strconv .ParseFloat (strings .TrimSpace (value ), 64 )
253
+ if err != nil {
254
+ klog .Fatalf ("Unable to parse value %v to float for metric %v" , metricValue , rule .Name )
255
+ }
256
+ stopRules = updateStopRules (objMetric , stopRules , optimalObjValue , metricValue , objType , metricStartStep , rule , idx )
253
257
}
258
+ default :
259
+ klog .Fatalf ("format must be set %s or %s" , commonv1beta1 .TextFormat .String (), commonv1beta1 .JsonFormat .String ())
254
260
}
255
261
256
262
// If stopRules array is empty, Trial is early stopped.
@@ -326,6 +332,55 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
326
332
}
327
333
}
328
334
335
+ func updateStopRules (
336
+ objMetric string ,
337
+ stopRules []commonv1beta1.EarlyStoppingRule ,
338
+ optimalObjValue * float64 ,
339
+ metricValue float64 ,
340
+ objType commonv1beta1.ObjectiveType ,
341
+ metricStartStep map [string ]int ,
342
+ rule commonv1beta1.EarlyStoppingRule ,
343
+ ruleIdx int ,
344
+ ) []commonv1beta1.EarlyStoppingRule {
345
+ // Calculate optimalObjValue.
346
+ if rule .Name == objMetric {
347
+ if optimalObjValue == nil {
348
+ optimalObjValue = & metricValue
349
+ } else if objType == commonv1beta1 .ObjectiveTypeMaximize && metricValue > * optimalObjValue {
350
+ optimalObjValue = & metricValue
351
+ } else if objType == commonv1beta1 .ObjectiveTypeMinimize && metricValue < * optimalObjValue {
352
+ optimalObjValue = & metricValue
353
+ }
354
+ // Assign best optimal value to metric value.
355
+ metricValue = * optimalObjValue
356
+ }
357
+
358
+ // Reduce steps if appropriate metric is reported.
359
+ // Once rest steps are empty we apply early stopping rule.
360
+ if _ , ok := metricStartStep [rule .Name ]; ok {
361
+ metricStartStep [rule .Name ]--
362
+ if metricStartStep [rule .Name ] != 0 {
363
+ return stopRules
364
+ }
365
+ }
366
+
367
+ ruleValue , err := strconv .ParseFloat (rule .Value , 64 )
368
+ if err != nil {
369
+ klog .Fatalf ("Unable to parse value %v to float for rule metric %v" , rule .Value , rule .Name )
370
+ }
371
+
372
+ // Metric value can be equal, less or greater than stop rule.
373
+ // Deleting suitable stop rule from the array.
374
+ if rule .Comparison == commonv1beta1 .ComparisonTypeEqual && metricValue == ruleValue {
375
+ return deleteStopRule (stopRules , ruleIdx )
376
+ } else if rule .Comparison == commonv1beta1 .ComparisonTypeLess && metricValue < ruleValue {
377
+ return deleteStopRule (stopRules , ruleIdx )
378
+ } else if rule .Comparison == commonv1beta1 .ComparisonTypeGreater && metricValue > ruleValue {
379
+ return deleteStopRule (stopRules , ruleIdx )
380
+ }
381
+ return stopRules
382
+ }
383
+
329
384
func deleteStopRule (stopRules []commonv1beta1.EarlyStoppingRule , idx int ) []commonv1beta1.EarlyStoppingRule {
330
385
if idx >= len (stopRules ) {
331
386
klog .Fatalf ("Index %v out of range stopRules: %v" , idx , stopRules )
@@ -383,7 +438,7 @@ func reportMetrics(filters []string) {
383
438
if len (* metricNames ) != 0 {
384
439
metricList = strings .Split (* metricNames , ";" )
385
440
}
386
- olog , err := filemc .CollectObservationLog (* metricsFilePath , metricList , filters )
441
+ olog , err := filemc .CollectObservationLog (* metricsFilePath , metricList , filters , * metricsFileFormat )
387
442
if err != nil {
388
443
klog .Fatalf ("Failed to collect logs: %v" , err )
389
444
}
0 commit comments