Skip to content

Commit d443ed3

Browse files
Support JSON format logs in file-metrics-collector (#1765)
* support JSON format logs in file-metrics-collector * review: convert fileFormat to type FileSystemFileFormat * Update cmd/metricscollector/v1beta1/file-metricscollector/main.go Co-authored-by: Andrey Velichkevich <[email protected]> * review: remove func (f FileSystemFileFormat) String() * review: get metricRegList only when the format is TEXT * review: change var name in a script for e2e * review: explict specify the cloudml-hypyertune in the Dockerfile * review: use reflect.DeepEqual instead of go-cmp.Diff * review: stop using 'JSON' directly in error statements * review: install specific version cloudml-hypertune * review: get objType in the updateStopRules function * review: save optimalObjValue across multiple stopRules * review: add warning messages to parseTimestamp func * review: generate test files with go test command * review: change api for new feature Co-authored-by: Andrey Velichkevich <[email protected]>
1 parent 36d0a57 commit d443ed3

File tree

22 files changed

+824
-123
lines changed

22 files changed

+824
-123
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ bin
2222
*.dll
2323
*.so
2424
*.dylib
25+
pkg/metricscollector/v1beta1/file-metricscollector/testdata
2526

2627
## Test binary, build with `go test -c`
2728
*.test

cmd/metricscollector/v1beta1/file-metricscollector/main.go

Lines changed: 127 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ package main
3939

4040
import (
4141
"context"
42+
"encoding/json"
4243
"flag"
4344
"fmt"
4445
"io/ioutil"
4546
"os"
4647
"path/filepath"
48+
"regexp"
4749
"strconv"
4850
"strings"
4951
"time"
@@ -102,6 +104,7 @@ var (
102104
earlyStopServiceAddr = flag.String("s-earlystop", "", "Katib Early Stopping service endpoint")
103105
trialName = flag.String("t", "", "Trial Name")
104106
metricsFilePath = flag.String("path", "", "Metrics File Path")
107+
metricsFileFormat = flag.String("format", "", "Metrics File Format")
105108
metricNames = flag.String("m", "", "Metric names")
106109
objectiveType = flag.String("o-type", "", "Objective type")
107110
metricFilters = flag.String("f", "", "Metric filters")
@@ -137,7 +140,7 @@ func printMetricsFile(mFile string) {
137140
}
138141
}
139142

140-
func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
143+
func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string, fileFormat commonv1beta1.FileFormat) {
141144

142145
// metricStartStep is the dict where key = metric name, value = start step.
143146
// We should apply early stopping rule only if metric is reported at least "start_step" times.
@@ -148,9 +151,6 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
148151
}
149152
}
150153

151-
// First metric is objective in metricNames array.
152-
objMetric := strings.Split(*metricNames, ";")[0]
153-
objType := commonv1beta1.ObjectiveType(*objectiveType)
154154
// For objective metric we calculate best optimal value from the recorded metrics.
155155
// This is workaround for Median Stop algorithm.
156156
// TODO (andreyvelich): Think about it, maybe define latest, max or min strategy type in stop-rule as well ?
@@ -169,88 +169,89 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
169169
klog.Fatalf("Failed to create new Process from pid %v, error: %v", mainProcPid, err)
170170
}
171171

172-
// Get list of regural expressions from filters.
173-
metricRegList := filemc.GetFilterRegexpList(filters)
174-
175172
// Start watch log lines.
176173
t, _ := tail.TailFile(mFile, tail.Config{Follow: true})
177174
for line := range t.Lines {
178175
logText := line.Text
179176
// Print log line
180177
klog.Info(logText)
181178

182-
// Check if log line contains metric from stop rules.
183-
isRuleLine := false
184-
for _, rule := range stopRules {
185-
if strings.Contains(logText, rule.Name) {
186-
isRuleLine = true
187-
break
188-
}
189-
}
190-
// If log line doesn't contain appropriate metric, continue track file.
191-
if !isRuleLine {
192-
continue
193-
}
194-
195-
// If log line contains appropriate metric, find all submatches from metric filters.
196-
for _, metricReg := range metricRegList {
197-
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
198-
for _, subMatchList := range matchStrings {
199-
if len(subMatchList) < 3 {
200-
continue
201-
}
202-
// Submatch must have metric name and float value
203-
metricName := strings.TrimSpace(subMatchList[1])
204-
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
205-
if err != nil {
206-
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
179+
switch fileFormat {
180+
case commonv1beta1.TextFormat:
181+
// Get list of regural expressions from filters.
182+
var metricRegList []*regexp.Regexp
183+
metricRegList = filemc.GetFilterRegexpList(filters)
184+
185+
// Check if log line contains metric from stop rules.
186+
isRuleLine := false
187+
for _, rule := range stopRules {
188+
if strings.Contains(logText, rule.Name) {
189+
isRuleLine = true
190+
break
207191
}
192+
}
193+
// If log line doesn't contain appropriate metric, continue track file.
194+
if !isRuleLine {
195+
continue
196+
}
208197

209-
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
210-
// After rule is reached we delete appropriate element from the array.
211-
for idx, rule := range stopRules {
212-
if metricName != rule.Name {
198+
// If log line contains appropriate metric, find all submatches from metric filters.
199+
for _, metricReg := range metricRegList {
200+
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
201+
for _, subMatchList := range matchStrings {
202+
if len(subMatchList) < 3 {
213203
continue
214204
}
215-
216-
// Calculate optimalObjValue.
217-
if metricName == objMetric {
218-
if optimalObjValue == nil {
219-
optimalObjValue = &metricValue
220-
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
221-
optimalObjValue = &metricValue
222-
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
223-
optimalObjValue = &metricValue
224-
}
225-
// Assign best optimal value to metric value.
226-
metricValue = *optimalObjValue
205+
// Submatch must have metric name and float value
206+
metricName := strings.TrimSpace(subMatchList[1])
207+
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
208+
if err != nil {
209+
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
227210
}
228211

229-
// Reduce steps if appropriate metric is reported.
230-
// Once rest steps are empty we apply early stopping rule.
231-
if _, ok := metricStartStep[metricName]; ok {
232-
metricStartStep[metricName]--
233-
if metricStartStep[metricName] != 0 {
212+
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
213+
// After rule is reached we delete appropriate element from the array.
214+
for idx, rule := range stopRules {
215+
if metricName != rule.Name {
234216
continue
235217
}
218+
stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx)
236219
}
220+
}
221+
}
222+
case commonv1beta1.JsonFormat:
223+
var logJsonObj map[string]interface{}
224+
if err = json.Unmarshal([]byte(logText), &logJsonObj); err != nil {
225+
klog.Fatalf("Failed to unmarshal logs in %v format, log: %s, error: %v", commonv1beta1.JsonFormat, logText, err)
226+
}
227+
// Check if log line contains metric from stop rules.
228+
isRuleLine := false
229+
for _, rule := range stopRules {
230+
if _, exist := logJsonObj[rule.Name]; exist {
231+
isRuleLine = true
232+
break
233+
}
234+
}
235+
// If log line doesn't contain appropriate metric, continue track file.
236+
if !isRuleLine {
237+
continue
238+
}
237239

238-
ruleValue, err := strconv.ParseFloat(rule.Value, 64)
239-
if err != nil {
240-
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
241-
}
242-
243-
// Metric value can be equal, less or greater than stop rule.
244-
// Deleting suitable stop rule from the array.
245-
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
246-
stopRules = deleteStopRule(stopRules, idx)
247-
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
248-
stopRules = deleteStopRule(stopRules, idx)
249-
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
250-
stopRules = deleteStopRule(stopRules, idx)
251-
}
240+
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
241+
// After rule is reached we delete appropriate element from the array.
242+
for idx, rule := range stopRules {
243+
value, exist := logJsonObj[rule.Name].(string)
244+
if !exist {
245+
continue
246+
}
247+
metricValue, err := strconv.ParseFloat(strings.TrimSpace(value), 64)
248+
if err != nil {
249+
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, rule.Name)
252250
}
251+
stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx)
253252
}
253+
default:
254+
klog.Fatalf("Format must be set to %v or %v", commonv1beta1.TextFormat, commonv1beta1.JsonFormat)
254255
}
255256

256257
// If stopRules array is empty, Trial is early stopped.
@@ -289,7 +290,7 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
289290
}
290291

291292
// Report metrics to DB.
292-
reportMetrics(filters)
293+
reportMetrics(filters, fileFormat)
293294

294295
// Wait until main process is completed.
295296
timeout := 60 * time.Second
@@ -326,6 +327,58 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
326327
}
327328
}
328329

330+
func updateStopRules(
331+
stopRules []commonv1beta1.EarlyStoppingRule,
332+
optimalObjValue *float64,
333+
metricValue float64,
334+
metricStartStep map[string]int,
335+
rule commonv1beta1.EarlyStoppingRule,
336+
ruleIdx int,
337+
) ([]commonv1beta1.EarlyStoppingRule, *float64) {
338+
339+
// First metric is objective in metricNames array.
340+
objMetric := strings.Split(*metricNames, ";")[0]
341+
objType := commonv1beta1.ObjectiveType(*objectiveType)
342+
343+
// Calculate optimalObjValue.
344+
if rule.Name == objMetric {
345+
if optimalObjValue == nil {
346+
optimalObjValue = &metricValue
347+
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
348+
optimalObjValue = &metricValue
349+
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
350+
optimalObjValue = &metricValue
351+
}
352+
// Assign best optimal value to metric value.
353+
metricValue = *optimalObjValue
354+
}
355+
356+
// Reduce steps if appropriate metric is reported.
357+
// Once rest steps are empty we apply early stopping rule.
358+
if _, ok := metricStartStep[rule.Name]; ok {
359+
metricStartStep[rule.Name]--
360+
if metricStartStep[rule.Name] != 0 {
361+
return stopRules, optimalObjValue
362+
}
363+
}
364+
365+
ruleValue, err := strconv.ParseFloat(rule.Value, 64)
366+
if err != nil {
367+
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
368+
}
369+
370+
// Metric value can be equal, less or greater than stop rule.
371+
// Deleting suitable stop rule from the array.
372+
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
373+
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
374+
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
375+
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
376+
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
377+
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
378+
}
379+
return stopRules, optimalObjValue
380+
}
381+
329382
func deleteStopRule(stopRules []commonv1beta1.EarlyStoppingRule, idx int) []commonv1beta1.EarlyStoppingRule {
330383
if idx >= len(stopRules) {
331384
klog.Fatalf("Index %v out of range stopRules: %v", idx, stopRules)
@@ -345,9 +398,11 @@ func main() {
345398
filters = strings.Split(*metricFilters, ";")
346399
}
347400

401+
fileFormat := commonv1beta1.FileFormat(*metricsFileFormat)
402+
348403
// If stop rule is set we need to parse metrics during run.
349404
if len(stopRules) != 0 {
350-
go watchMetricsFile(*metricsFilePath, stopRules, filters)
405+
go watchMetricsFile(*metricsFilePath, stopRules, filters, fileFormat)
351406
} else {
352407
go printMetricsFile(*metricsFilePath)
353408
}
@@ -366,11 +421,11 @@ func main() {
366421

367422
// If training was not early stopped, report the metrics.
368423
if !isEarlyStopped {
369-
reportMetrics(filters)
424+
reportMetrics(filters, fileFormat)
370425
}
371426
}
372427

373-
func reportMetrics(filters []string) {
428+
func reportMetrics(filters []string, fileFormat commonv1beta1.FileFormat) {
374429

375430
conn, err := grpc.Dial(*dbManagerServiceAddr, grpc.WithInsecure())
376431
if err != nil {
@@ -383,7 +438,7 @@ func reportMetrics(filters []string) {
383438
if len(*metricNames) != 0 {
384439
metricList = strings.Split(*metricNames, ";")
385440
}
386-
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters)
441+
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters, fileFormat)
387442
if err != nil {
388443
klog.Fatalf("Failed to collect logs: %v", err)
389444
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# This is example with median stopping early stopping rule with logs in JSON format.
2+
# It has bad feasible space for learning rate to show more early stopped Trials.
3+
apiVersion: kubeflow.org/v1beta1
4+
kind: Experiment
5+
metadata:
6+
namespace: kubeflow
7+
name: median-stop-with-json-format
8+
spec:
9+
objective:
10+
type: maximize
11+
goal: 0.99
12+
objectiveMetricName: accuracy
13+
additionalMetricNames:
14+
- loss
15+
metricsCollectorSpec:
16+
source:
17+
fileSystemPath:
18+
path: "/katib/mnist.json"
19+
kind: File
20+
format: JSON
21+
collector:
22+
kind: File
23+
algorithm:
24+
algorithmName: random
25+
earlyStopping:
26+
algorithmName: medianstop
27+
algorithmSettings:
28+
- name: min_trials_required
29+
value: "1"
30+
- name: start_step
31+
value: "2"
32+
parallelTrialCount: 2
33+
maxTrialCount: 15
34+
maxFailedTrialCount: 3
35+
parameters:
36+
- name: lr
37+
parameterType: double
38+
feasibleSpace:
39+
min: "0.01"
40+
max: "0.5"
41+
- name: num-epochs
42+
parameterType: int
43+
feasibleSpace:
44+
min: "3"
45+
max: "4"
46+
trialTemplate:
47+
retain: true
48+
primaryContainerName: training-container
49+
trialParameters:
50+
- name: learningRate
51+
description: Learning rate for the training model
52+
reference: lr
53+
- name: numberEpochs
54+
description: Number of epochs to train the model
55+
reference: num-epochs
56+
trialSpec:
57+
apiVersion: batch/v1
58+
kind: Job
59+
spec:
60+
template:
61+
spec:
62+
containers:
63+
- name: training-container
64+
image: docker.io/kubeflowkatib/pytorch-mnist:latest
65+
command:
66+
- "python3"
67+
- "/opt/pytorch-mnist/mnist.py"
68+
- "--epochs=${trialParameters.numberEpochs}"
69+
- "--log-path=/katib/mnist.json"
70+
- "--lr=${trialParameters.learningRate}"
71+
- "--logger=hypertune"
72+
restartPolicy: Never

0 commit comments

Comments
 (0)