Skip to content

Commit 2e6eff1

Browse files
committed
support JSON format logs in file-metrics-collector
1 parent 2a0b12e commit 2e6eff1

File tree

25 files changed

+757
-110
lines changed

25 files changed

+757
-110
lines changed

cmd/metricscollector/v1beta1/file-metricscollector/main.go

Lines changed: 117 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,13 @@ package main
3939

4040
import (
4141
"context"
42+
"encoding/json"
4243
"flag"
4344
"fmt"
4445
"io/ioutil"
4546
"os"
4647
"path/filepath"
48+
"regexp"
4749
"strconv"
4850
"strings"
4951
"time"
@@ -102,6 +104,7 @@ var (
102104
earlyStopServiceAddr = flag.String("s-earlystop", "", "Katib Early Stopping service endpoint")
103105
trialName = flag.String("t", "", "Trial Name")
104106
metricsFilePath = flag.String("path", "", "Metrics File Path")
107+
metricsFileFormat = flag.String("format", "", "Metrics File Format")
105108
metricNames = flag.String("m", "", "Metric names")
106109
objectiveType = flag.String("o-type", "", "Objective type")
107110
metricFilters = flag.String("f", "", "Metric filters")
@@ -170,7 +173,10 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
170173
}
171174

172175
// Get list of regural expressions from filters.
173-
metricRegList := filemc.GetFilterRegexpList(filters)
176+
var metricRegList []*regexp.Regexp
177+
if *metricsFileFormat == commonv1beta1.TextFormat.String() {
178+
metricRegList = filemc.GetFilterRegexpList(filters)
179+
}
174180

175181
// Start watch log lines.
176182
t, _ := tail.TailFile(mFile, tail.Config{Follow: true})
@@ -179,78 +185,78 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
179185
// Print log line
180186
klog.Info(logText)
181187

182-
// Check if log line contains metric from stop rules.
183-
isRuleLine := false
184-
for _, rule := range stopRules {
185-
if strings.Contains(logText, rule.Name) {
186-
isRuleLine = true
187-
break
188-
}
189-
}
190-
// If log line doesn't contain appropriate metric, continue track file.
191-
if !isRuleLine {
192-
continue
193-
}
194-
195-
// If log line contains appropriate metric, find all submatches from metric filters.
196-
for _, metricReg := range metricRegList {
197-
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
198-
for _, subMatchList := range matchStrings {
199-
if len(subMatchList) < 3 {
200-
continue
201-
}
202-
// Submatch must have metric name and float value
203-
metricName := strings.TrimSpace(subMatchList[1])
204-
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
205-
if err != nil {
206-
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
188+
switch *metricsFileFormat {
189+
case commonv1beta1.TextFormat.String():
190+
// Check if log line contains metric from stop rules.
191+
isRuleLine := false
192+
for _, rule := range stopRules {
193+
if strings.Contains(logText, rule.Name) {
194+
isRuleLine = true
195+
break
207196
}
197+
}
198+
// If log line doesn't contain appropriate metric, continue track file.
199+
if !isRuleLine {
200+
continue
201+
}
208202

209-
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
210-
// After rule is reached we delete appropriate element from the array.
211-
for idx, rule := range stopRules {
212-
if metricName != rule.Name {
203+
// If log line contains appropriate metric, find all submatches from metric filters.
204+
for _, metricReg := range metricRegList {
205+
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
206+
for _, subMatchList := range matchStrings {
207+
if len(subMatchList) < 3 {
213208
continue
214209
}
215-
216-
// Calculate optimalObjValue.
217-
if metricName == objMetric {
218-
if optimalObjValue == nil {
219-
optimalObjValue = &metricValue
220-
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
221-
optimalObjValue = &metricValue
222-
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
223-
optimalObjValue = &metricValue
224-
}
225-
// Assign best optimal value to metric value.
226-
metricValue = *optimalObjValue
210+
// Submatch must have metric name and float value
211+
metricName := strings.TrimSpace(subMatchList[1])
212+
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
213+
if err != nil {
214+
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
227215
}
228216

229-
// Reduce steps if appropriate metric is reported.
230-
// Once rest steps are empty we apply early stopping rule.
231-
if _, ok := metricStartStep[metricName]; ok {
232-
metricStartStep[metricName]--
233-
if metricStartStep[metricName] != 0 {
217+
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
218+
// After rule is reached we delete appropriate element from the array.
219+
for idx, rule := range stopRules {
220+
if metricName != rule.Name {
234221
continue
235222
}
223+
stopRules = updateStopRules(objMetric, stopRules, optimalObjValue, metricValue, objType, metricStartStep, rule, idx)
236224
}
225+
}
226+
}
227+
case commonv1beta1.JsonFormat.String():
228+
var logJsonObj map[string]interface{}
229+
if err = json.Unmarshal([]byte(logText), &logJsonObj); err != nil {
230+
klog.Fatalf("Failed to unmarshal logs in JSON format, log: %s, error: %v", logText, err)
231+
}
232+
// Check if log line contains metric from stop rules.
233+
isRuleLine := false
234+
for _, rule := range stopRules {
235+
if _, exist := logJsonObj[rule.Name]; exist {
236+
isRuleLine = true
237+
break
238+
}
239+
}
240+
// If log line doesn't contain appropriate metric, continue track file.
241+
if !isRuleLine {
242+
continue
243+
}
237244

238-
ruleValue, err := strconv.ParseFloat(rule.Value, 64)
239-
if err != nil {
240-
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
241-
}
242-
243-
// Metric value can be equal, less or greater than stop rule.
244-
// Deleting suitable stop rule from the array.
245-
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
246-
stopRules = deleteStopRule(stopRules, idx)
247-
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
248-
stopRules = deleteStopRule(stopRules, idx)
249-
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
250-
stopRules = deleteStopRule(stopRules, idx)
251-
}
245+
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
246+
// After rule is reached we delete appropriate element from the array.
247+
for idx, rule := range stopRules {
248+
value, exist := logJsonObj[rule.Name].(string)
249+
if !exist {
250+
continue
252251
}
252+
metricValue, err := strconv.ParseFloat(strings.TrimSpace(value), 64)
253+
if err != nil {
254+
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, rule.Name)
255+
}
256+
stopRules = updateStopRules(objMetric, stopRules, optimalObjValue, metricValue, objType, metricStartStep, rule, idx)
253257
}
258+
default:
259+
klog.Fatalf("format must be set %s or %s", commonv1beta1.TextFormat.String(), commonv1beta1.JsonFormat.String())
254260
}
255261

256262
// If stopRules array is empty, Trial is early stopped.
@@ -326,6 +332,55 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
326332
}
327333
}
328334

335+
func updateStopRules(
336+
objMetric string,
337+
stopRules []commonv1beta1.EarlyStoppingRule,
338+
optimalObjValue *float64,
339+
metricValue float64,
340+
objType commonv1beta1.ObjectiveType,
341+
metricStartStep map[string]int,
342+
rule commonv1beta1.EarlyStoppingRule,
343+
ruleIdx int,
344+
) []commonv1beta1.EarlyStoppingRule {
345+
// Calculate optimalObjValue.
346+
if rule.Name == objMetric {
347+
if optimalObjValue == nil {
348+
optimalObjValue = &metricValue
349+
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
350+
optimalObjValue = &metricValue
351+
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
352+
optimalObjValue = &metricValue
353+
}
354+
// Assign best optimal value to metric value.
355+
metricValue = *optimalObjValue
356+
}
357+
358+
// Reduce steps if appropriate metric is reported.
359+
// Once rest steps are empty we apply early stopping rule.
360+
if _, ok := metricStartStep[rule.Name]; ok {
361+
metricStartStep[rule.Name]--
362+
if metricStartStep[rule.Name] != 0 {
363+
return stopRules
364+
}
365+
}
366+
367+
ruleValue, err := strconv.ParseFloat(rule.Value, 64)
368+
if err != nil {
369+
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
370+
}
371+
372+
// Metric value can be equal, less or greater than stop rule.
373+
// Deleting suitable stop rule from the array.
374+
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
375+
return deleteStopRule(stopRules, ruleIdx)
376+
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
377+
return deleteStopRule(stopRules, ruleIdx)
378+
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
379+
return deleteStopRule(stopRules, ruleIdx)
380+
}
381+
return stopRules
382+
}
383+
329384
func deleteStopRule(stopRules []commonv1beta1.EarlyStoppingRule, idx int) []commonv1beta1.EarlyStoppingRule {
330385
if idx >= len(stopRules) {
331386
klog.Fatalf("Index %v out of range stopRules: %v", idx, stopRules)
@@ -383,7 +438,7 @@ func reportMetrics(filters []string) {
383438
if len(*metricNames) != 0 {
384439
metricList = strings.Split(*metricNames, ";")
385440
}
386-
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters)
441+
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters, *metricsFileFormat)
387442
if err != nil {
388443
klog.Fatalf("Failed to collect logs: %v", err)
389444
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# This is example with median stopping early stopping rule with logs in JSON format.
2+
# It has bad feasible space for learning rate to show more early stopped Trials.
3+
apiVersion: kubeflow.org/v1beta1
4+
kind: Experiment
5+
metadata:
6+
namespace: kubeflow
7+
name: median-stop-with-json-format
8+
spec:
9+
objective:
10+
type: maximize
11+
goal: 0.99
12+
objectiveMetricName: accuracy
13+
additionalMetricNames:
14+
- loss
15+
metricsCollectorSpec:
16+
source:
17+
fileSystemPath:
18+
path: "/katib/mnist.json"
19+
kind: File
20+
fileFormat: JSON
21+
collector:
22+
kind: File
23+
algorithm:
24+
algorithmName: random
25+
earlyStopping:
26+
algorithmName: medianstop
27+
algorithmSettings:
28+
- name: min_trials_required
29+
value: "1"
30+
- name: start_step
31+
value: "2"
32+
parallelTrialCount: 2
33+
maxTrialCount: 15
34+
maxFailedTrialCount: 3
35+
parameters:
36+
- name: lr
37+
parameterType: double
38+
feasibleSpace:
39+
min: "0.01"
40+
max: "0.5"
41+
- name: num-epochs
42+
parameterType: int
43+
feasibleSpace:
44+
min: "3"
45+
max: "4"
46+
trialTemplate:
47+
retain: true
48+
primaryContainerName: training-container
49+
trialParameters:
50+
- name: learningRate
51+
description: Learning rate for the training model
52+
reference: lr
53+
- name: numberEpochs
54+
description: Number of epochs to train the model
55+
reference: num-epochs
56+
trialSpec:
57+
apiVersion: batch/v1
58+
kind: Job
59+
spec:
60+
template:
61+
spec:
62+
containers:
63+
- name: training-container
64+
image: docker.io/kubeflowkatib/pytorch-mnist:latest
65+
command:
66+
- "python3"
67+
- "/opt/pytorch-mnist/mnist.py"
68+
- "--epochs=${trialParameters.numberEpochs}"
69+
- "--log-path=/katib/mnist.json"
70+
- "--lr=${trialParameters.learningRate}"
71+
- "--logger=hypertune"
72+
restartPolicy: Never
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
apiVersion: kubeflow.org/v1beta1
2+
kind: Experiment
3+
metadata:
4+
namespace: kubeflow
5+
name: file-metrics-collector-with-json-format
6+
spec:
7+
objective:
8+
type: maximize
9+
goal: 0.99
10+
objectiveMetricName: accuracy
11+
additionalMetricNames:
12+
- loss
13+
metricsCollectorSpec:
14+
source:
15+
fileSystemPath:
16+
path: "/katib/mnist.json"
17+
kind: File
18+
fileFormat: JSON
19+
collector:
20+
kind: File
21+
algorithm:
22+
algorithmName: random
23+
parallelTrialCount: 3
24+
maxTrialCount: 12
25+
maxFailedTrialCount: 3
26+
parameters:
27+
- name: lr
28+
parameterType: double
29+
feasibleSpace:
30+
min: "0.01"
31+
max: "0.03"
32+
- name: momentum
33+
parameterType: double
34+
feasibleSpace:
35+
min: "0.3"
36+
max: "0.7"
37+
trialTemplate:
38+
primaryContainerName: training-container
39+
trialParameters:
40+
- name: learningRate
41+
description: Learning rate for the training model
42+
reference: lr
43+
- name: momentum
44+
description: Momentum for the training model
45+
reference: momentum
46+
trialSpec:
47+
apiVersion: batch/v1
48+
kind: Job
49+
spec:
50+
template:
51+
spec:
52+
containers:
53+
- name: training-container
54+
image: docker.io/kubeflowkatib/pytorch-mnist:latest
55+
command:
56+
- "python3"
57+
- "/opt/pytorch-mnist/mnist.py"
58+
- "--epochs=2"
59+
- "--log-path=/katib/mnist.json"
60+
- "--lr=${trialParameters.learningRate}"
61+
- "--momentum=${trialParameters.momentum}"
62+
- "--logger=hypertune"
63+
restartPolicy: Never

examples/v1beta1/trial-images/pytorch-mnist/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ WORKDIR /opt/pytorch-mnist
55

66
# Add folder for the logs.
77
RUN mkdir /katib
8+
RUN pip install cloudml-hypertune
89

910
RUN chgrp -R 0 /opt/pytorch-mnist \
1011
&& chmod -R g+rwX /opt/pytorch-mnist \

examples/v1beta1/trial-images/pytorch-mnist/README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,5 +5,7 @@ to the file or printing to the StdOut. It uses convolutional neural network to
55
train the model.
66

77
Katib uses this training container in some Experiments, for instance in the
8-
[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64)
9-
or in the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54).
8+
[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64),
9+
the [file Metrics Collector with logs in JSON format example](../../metrics-collector/file-metrics-collector-with-json-format.yaml#L52-L62),
10+
the [median stopping early stopping rule with logs in JSON format example](../../early-stopping/median-stop-with-json-format.yaml#L62-L71)
11+
and the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54).

0 commit comments

Comments
 (0)