Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bin
*.dll
*.so
*.dylib
pkg/metricscollector/v1beta1/file-metricscollector/testdata

## Test binary, build with `go test -c`
*.test
Expand Down
199 changes: 127 additions & 72 deletions cmd/metricscollector/v1beta1/file-metricscollector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ package main

import (
"context"
"encoding/json"
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -102,6 +104,7 @@ var (
earlyStopServiceAddr = flag.String("s-earlystop", "", "Katib Early Stopping service endpoint")
trialName = flag.String("t", "", "Trial Name")
metricsFilePath = flag.String("path", "", "Metrics File Path")
metricsFileFormat = flag.String("format", "", "Metrics File Format")
metricNames = flag.String("m", "", "Metric names")
objectiveType = flag.String("o-type", "", "Objective type")
metricFilters = flag.String("f", "", "Metric filters")
Expand Down Expand Up @@ -137,7 +140,7 @@ func printMetricsFile(mFile string) {
}
}

func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string, fileFormat commonv1beta1.FileFormat) {

// metricStartStep is the dict where key = metric name, value = start step.
// We should apply early stopping rule only if metric is reported at least "start_step" times.
Expand All @@ -148,9 +151,6 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}
}

// First metric is objective in metricNames array.
objMetric := strings.Split(*metricNames, ";")[0]
objType := commonv1beta1.ObjectiveType(*objectiveType)
// For objective metric we calculate best optimal value from the recorded metrics.
// This is workaround for Median Stop algorithm.
// TODO (andreyvelich): Think about it, maybe define latest, max or min strategy type in stop-rule as well ?
Expand All @@ -169,88 +169,89 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
klog.Fatalf("Failed to create new Process from pid %v, error: %v", mainProcPid, err)
}

// Get list of regural expressions from filters.
metricRegList := filemc.GetFilterRegexpList(filters)

// Start watch log lines.
t, _ := tail.TailFile(mFile, tail.Config{Follow: true})
for line := range t.Lines {
logText := line.Text
// Print log line
klog.Info(logText)

// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if strings.Contains(logText, rule.Name) {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

// If log line contains appropriate metric, find all submatches from metric filters.
for _, metricReg := range metricRegList {
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
for _, subMatchList := range matchStrings {
if len(subMatchList) < 3 {
continue
}
// Submatch must have metric name and float value
metricName := strings.TrimSpace(subMatchList[1])
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
switch fileFormat {
case commonv1beta1.TextFormat:
// Get list of regural expressions from filters.
var metricRegList []*regexp.Regexp
metricRegList = filemc.GetFilterRegexpList(filters)

// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if strings.Contains(logText, rule.Name) {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
if metricName != rule.Name {
// If log line contains appropriate metric, find all submatches from metric filters.
for _, metricReg := range metricRegList {
matchStrings := metricReg.FindAllStringSubmatch(logText, -1)
for _, subMatchList := range matchStrings {
if len(subMatchList) < 3 {
continue
}

// Calculate optimalObjValue.
if metricName == objMetric {
if optimalObjValue == nil {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
optimalObjValue = &metricValue
}
// Assign best optimal value to metric value.
metricValue = *optimalObjValue
// Submatch must have metric name and float value
metricName := strings.TrimSpace(subMatchList[1])
metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName)
}

// Reduce steps if appropriate metric is reported.
// Once rest steps are empty we apply early stopping rule.
if _, ok := metricStartStep[metricName]; ok {
metricStartStep[metricName]--
if metricStartStep[metricName] != 0 {
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
if metricName != rule.Name {
continue
}
stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx)
}
}
}
case commonv1beta1.JsonFormat:
var logJsonObj map[string]interface{}
if err = json.Unmarshal([]byte(logText), &logJsonObj); err != nil {
klog.Fatalf("Failed to unmarshal logs in %v format, log: %s, error: %v", commonv1beta1.JsonFormat, logText, err)
}
// Check if log line contains metric from stop rules.
isRuleLine := false
for _, rule := range stopRules {
if _, exist := logJsonObj[rule.Name]; exist {
isRuleLine = true
break
}
}
// If log line doesn't contain appropriate metric, continue track file.
if !isRuleLine {
continue
}

ruleValue, err := strconv.ParseFloat(rule.Value, 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
}

// Metric value can be equal, less or greater than stop rule.
// Deleting suitable stop rule from the array.
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
stopRules = deleteStopRule(stopRules, idx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
stopRules = deleteStopRule(stopRules, idx)
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
stopRules = deleteStopRule(stopRules, idx)
}
// stopRules contains array of EarlyStoppingRules that has not been reached yet.
// After rule is reached we delete appropriate element from the array.
for idx, rule := range stopRules {
value, exist := logJsonObj[rule.Name].(string)
if !exist {
continue
}
metricValue, err := strconv.ParseFloat(strings.TrimSpace(value), 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, rule.Name)
}
stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx)
}
default:
klog.Fatalf("Format must be set to %v or %v", commonv1beta1.TextFormat, commonv1beta1.JsonFormat)
}

// If stopRules array is empty, Trial is early stopped.
Expand Down Expand Up @@ -289,7 +290,7 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}

// Report metrics to DB.
reportMetrics(filters)
reportMetrics(filters, fileFormat)

// Wait until main process is completed.
timeout := 60 * time.Second
Expand Down Expand Up @@ -326,6 +327,58 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) {
}
}

func updateStopRules(
stopRules []commonv1beta1.EarlyStoppingRule,
optimalObjValue *float64,
metricValue float64,
metricStartStep map[string]int,
rule commonv1beta1.EarlyStoppingRule,
ruleIdx int,
) ([]commonv1beta1.EarlyStoppingRule, *float64) {

// First metric is objective in metricNames array.
objMetric := strings.Split(*metricNames, ";")[0]
objType := commonv1beta1.ObjectiveType(*objectiveType)

// Calculate optimalObjValue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we didn't add for idx, rule := range stopRules under this function, how optimalObjValue will be saved across multiple stopRules ?

Copy link
Member Author

@tenzen-y tenzen-y Jan 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops...
You're right, this implementation can't update the optimalObjValue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I addressed in 124500b.
Could you please take another look? @andreyvelich

if rule.Name == objMetric {
if optimalObjValue == nil {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue {
optimalObjValue = &metricValue
} else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue {
optimalObjValue = &metricValue
}
// Assign best optimal value to metric value.
metricValue = *optimalObjValue
}

// Reduce steps if appropriate metric is reported.
// Once rest steps are empty we apply early stopping rule.
if _, ok := metricStartStep[rule.Name]; ok {
metricStartStep[rule.Name]--
if metricStartStep[rule.Name] != 0 {
return stopRules, optimalObjValue
}
}

ruleValue, err := strconv.ParseFloat(rule.Value, 64)
if err != nil {
klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name)
}

// Metric value can be equal, less or greater than stop rule.
// Deleting suitable stop rule from the array.
if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue {
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
} else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue {
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
} else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue {
return deleteStopRule(stopRules, ruleIdx), optimalObjValue
}
return stopRules, optimalObjValue
}

func deleteStopRule(stopRules []commonv1beta1.EarlyStoppingRule, idx int) []commonv1beta1.EarlyStoppingRule {
if idx >= len(stopRules) {
klog.Fatalf("Index %v out of range stopRules: %v", idx, stopRules)
Expand All @@ -345,9 +398,11 @@ func main() {
filters = strings.Split(*metricFilters, ";")
}

fileFormat := commonv1beta1.FileFormat(*metricsFileFormat)

// If stop rule is set we need to parse metrics during run.
if len(stopRules) != 0 {
go watchMetricsFile(*metricsFilePath, stopRules, filters)
go watchMetricsFile(*metricsFilePath, stopRules, filters, fileFormat)
} else {
go printMetricsFile(*metricsFilePath)
}
Expand All @@ -366,11 +421,11 @@ func main() {

// If training was not early stopped, report the metrics.
if !isEarlyStopped {
reportMetrics(filters)
reportMetrics(filters, fileFormat)
}
}

func reportMetrics(filters []string) {
func reportMetrics(filters []string, fileFormat commonv1beta1.FileFormat) {

conn, err := grpc.Dial(*dbManagerServiceAddr, grpc.WithInsecure())
if err != nil {
Expand All @@ -383,7 +438,7 @@ func reportMetrics(filters []string) {
if len(*metricNames) != 0 {
metricList = strings.Split(*metricNames, ";")
}
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters)
olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters, fileFormat)
if err != nil {
klog.Fatalf("Failed to collect logs: %v", err)
}
Expand Down
72 changes: 72 additions & 0 deletions examples/v1beta1/early-stopping/median-stop-with-json-format.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# This is example with median stopping early stopping rule with logs in JSON format.
# It has bad feasible space for learning rate to show more early stopped Trials.
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
namespace: kubeflow
name: median-stop-with-json-format
spec:
objective:
type: maximize
goal: 0.99
objectiveMetricName: accuracy
additionalMetricNames:
- loss
metricsCollectorSpec:
source:
fileSystemPath:
path: "/katib/mnist.json"
kind: File
format: JSON
collector:
kind: File
algorithm:
algorithmName: random
earlyStopping:
algorithmName: medianstop
algorithmSettings:
- name: min_trials_required
value: "1"
- name: start_step
value: "2"
parallelTrialCount: 2
maxTrialCount: 15
maxFailedTrialCount: 3
parameters:
- name: lr
parameterType: double
feasibleSpace:
min: "0.01"
max: "0.5"
- name: num-epochs
parameterType: int
feasibleSpace:
min: "3"
max: "4"
trialTemplate:
retain: true
primaryContainerName: training-container
trialParameters:
- name: learningRate
description: Learning rate for the training model
reference: lr
- name: numberEpochs
description: Number of epochs to train the model
reference: num-epochs
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: docker.io/kubeflowkatib/pytorch-mnist:latest
command:
- "python3"
- "/opt/pytorch-mnist/mnist.py"
- "--epochs=${trialParameters.numberEpochs}"
- "--log-path=/katib/mnist.json"
- "--lr=${trialParameters.learningRate}"
- "--logger=hypertune"
restartPolicy: Never
Loading