Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions pkg/controller.v2/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ const (
noHit = "no-hit"

defaultPortStr = "2222"

// tfJobCreatedReason is added in a tfjob when it is created.
tfJobCreatedReason = "TFJobCreated"
// tfJobSucceededReason is added in a tfjob when it is succeeded.
tfJobSucceededReason = "TFJobSucceeded"
// tfJobSucceededReason is added in a tfjob when it is running.
tfJobRunningReason = "TFJobRunning"
// tfJobSucceededReason is added in a tfjob when it is failed.
tfJobFailedReason = "TFJobFailed"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
Expand Down
10 changes: 10 additions & 0 deletions pkg/controller.v2/controller_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ import (
tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
)

func loggerForReplica(tfjob *tfv1alpha2.TFJob, rtype string) *log.Entry {
return log.WithFields(log.Fields{
// We use job to match the key used in controller.go
// In controller.go we log the key used with the workqueue.
"job": tfjob.ObjectMeta.Namespace + "/" + tfjob.ObjectMeta.Name,
"uid": tfjob.ObjectMeta.UID,
"replica-type": rtype,
})
}

func loggerForTFJob(tfjob *tfv1alpha2.TFJob) *log.Entry {
return log.WithFields(log.Fields{
// We use job to match the key used in controller.go
Expand Down
282 changes: 82 additions & 200 deletions pkg/controller.v2/controller_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,227 +37,127 @@ func (tc *TFJobController) reconcilePods(
pods []*v1.Pod,
rtype tfv1alpha2.TFReplicaType,
spec *tfv1alpha2.TFReplicaSpec) error {
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}

// Convert TFReplicaType to lower string.
rt := strings.ToLower(string(rtype))
// Get all pods for the type rt.
pods = filterPodsForTFReplicaType(pods, rt)
activePods := FilterActivePods(pods)
succeeded, failed := getPodStatus(pods)
runningPods := filterRunningPods(pods)

// Expect to have `replicas - succeeded` pods alive.
expected := *spec.Replicas - succeeded

// All workers are succeeded, leave a succeeded condition.
if expected == 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is successfully completed.", tfjob.Name)
now := metav1.Now()
tfjob.Status.CompletionTime = &now
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobSucceeded, tfJobSucceededReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

// Some workers are still running, leave a running condition.
if len(runningPods) > 0 && rtype == tfv1alpha2.TFReplicaTypeWorker {
msg := fmt.Sprintf("TFJob %s is running.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobRunning, tfJobRunningReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

// All workers are running, set StartTime
if len(runningPods) == int(*spec.Replicas) && rtype == tfv1alpha2.TFReplicaTypeWorker {
now := metav1.Now()
tfjob.Status.StartTime = &now
}

// Some workers or pss are failed , leave a failed condition.
if failed > 0 {
msg := fmt.Sprintf("TFJob %s is failed.", tfjob.Name)
err := tc.updateTFJobConditions(tfjob, tfv1alpha2.TFJobFailed, tfJobFailedReason, msg)
if err != nil {
loggerForTFJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

// TODO(gaocegege): Use syncPods to sync all replicas to ensure that all replicas only has one pod running/succeeded.
diff := len(activePods) - int(expected)

if diff < 0 {
// Need to create new pods.
diffIndexes := getDiffPodIndexes(activePods, expected, loggerForTFJob(tfjob))
if diff+len(diffIndexes) != 0 {
// This should never happened.
return fmt.Errorf("pods diff(%d) is not equal to length(%d) of diffIndexes", diff, len(diffIndexes))
}

expectationPodsKey := genExpectationPodsKey(tfjobKey, rt)
err := tc.expectations.ExpectCreations(expectationPodsKey, diff)
if err != nil {
return err
}

for _, index := range diffIndexes {
loggerForTFJob(tfjob).Infof("need to create new pod: %s-%s", rt, index)

// Create OwnerReference.
controllerRef := genOwnerReference(tfjob)

// Append tfReplicaTypeLabel and tfReplicaIndexLabel labels.
pTemplate := spec.Template.DeepCopy()

labels := genLabels(tfjobKey)
// Set type and index for the worker.
labels[tfReplicaTypeLabel] = rt
labels[tfReplicaIndexLabel] = index

if pTemplate.Labels == nil {
pTemplate.Labels = make(map[string]string)
}

for key, value := range labels {
pTemplate.Labels[key] = value
}

// Generate TF_CONFIG JSON string.
tfConfigStr := genTFConfigJSONStr(tfjob, rt, index)
replicas := int(*spec.Replicas)

if tfConfigStr == "" {
return nil
}
// Add TF_CONFIG environment variable.
for i, _ := range pTemplate.Spec.Containers {
if len(pTemplate.Spec.Containers[i].Env) == 0 {
pTemplate.Spec.Containers[i].Env = make([]v1.EnvVar, 0)
}
pTemplate.Spec.Containers[i].Env = append(pTemplate.Spec.Containers[i].Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: tfConfigStr,
})
}

// Set restart policy
if spec.RestartPolicy != tfv1alpha2.RestartPolicyExitCode {
pTemplate.Spec.RestartPolicy = v1.RestartPolicy(spec.RestartPolicy)
}
initializeTFReplicaStatuses(tfjob, rtype)

err := tc.podControl.CreatePodsWithControllerRef(tfjob.Namespace, pTemplate, tfjob, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
} else if err != nil {
podSlices := getPodSlices(pods, replicas, loggerForReplica(tfjob, rt))
for index, podSlice := range podSlices {
if len(podSlice) > 1 {
loggerForReplica(tfjob, rt).Warning("We have to many pods for the worker %d", index)
// TODO(gaocegege): Kill some pods.
} else if len(podSlice) == 0 {
loggerForReplica(tfjob, rt).Infof("need to create new pod: %s-%d", rt, index)
err := tc.createNewPod(tfjob, rt, string(index), spec)
if err != nil {
return err
}
increaseTFJobReplicaStatusesActive(tfjob, rtype)
} else {
// We already have one, and check the status.
pod := podSlice[0]
updateTFJobReplicaStatuses(tfjob, rtype, pod)
}
} else if diff > 0 {
// TODO(CPH): Need to delete pods.
loggerForTFJob(tfjob).Infof("need to delete pod but it is not implemented yet")
}

if tfjob.Status.TFReplicaStatuses == nil {
tfjob.Status.TFReplicaStatuses = make(map[tfv1alpha2.TFReplicaType]*tfv1alpha2.TFReplicaStatus)
}

if _, ok := tfjob.Status.TFReplicaStatuses[rtype]; !ok {
tfjob.Status.TFReplicaStatuses[rtype] = &tfv1alpha2.TFReplicaStatus{}
}

// Update the active status since we have created -diff pods during the loop.
tfjob.Status.TFReplicaStatuses[rtype].Active = expected
tfjob.Status.TFReplicaStatuses[rtype].Succeeded = succeeded
tfjob.Status.TFReplicaStatuses[rtype].Failed = failed
return nil
}

func (tc *TFJobController) syncPods(pods []*v1.Pod, replicas int, logger *log.Entry) {
podSlices := getPodSlices(pods, replicas, logger)
for index, podSlice := range podSlices {
if len(podSlice) > 1 {
logger.Warning("We have to many pods for the worker %d", index)
// Kill some
}
if len(podSlice) == 0 {
// Create one
}
// We already have one, and check if it is succeede or something else.
// pod := podSlice[0]
}
return tc.updateStatus(tfjob, rtype, replicas)
}

// getPodSlices returns a slice, which element is the slice of pod.
func getPodSlices(pods []*v1.Pod, replicas int, logger *log.Entry) [][]*v1.Pod {
podSlices := make([][]*v1.Pod, 0)
podSlices := make([][]*v1.Pod, replicas)
for _, pod := range pods {
if _, ok := pod.Labels[tfReplicaIndexLabel]; !ok {
logger.Warning("The pod do not have the index label.")
continue
}
index, err := strconv.Atoi(pod.Labels[tfReplicaIndexLabel])
if err != nil {
logger.Warning("Error when strconv.Atoi: %v", err)
continue
}
if index < 0 || index >= replicas {
logger.Warningf("The label index is not expected: %d", index)
} else {
podSlices[index] = append(podSlices[index], pod)
}

podSlices[index] = append(podSlices[index], pod)
}
return podSlices
}

// getDiffPodIndexes checks and gets diff indexes from desired and current.
func getDiffPodIndexes(activePods []*v1.Pod, replicas int32, logger *log.Entry) []string {
desiredIndexes := make(map[string]string)
// createNewPod creates a new pod for the given index and type.
func (tc *TFJobController) createNewPod(tfjob *tfv1alpha2.TFJob, rt, index string, spec *tfv1alpha2.TFReplicaSpec) error {
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}
expectationPodsKey := genExpectationPodsKey(tfjobKey, rt)
err = tc.expectations.ExpectCreations(expectationPodsKey, 1)
if err != nil {
return err
}

// Create OwnerReference.
controllerRef := genOwnerReference(tfjob)

// Set type and index for the worker.
labels := genLabels(tfjobKey)
labels[tfReplicaTypeLabel] = rt
labels[tfReplicaIndexLabel] = index

for i := int32(0); i < replicas; i++ {
desiredIndexes[fmt.Sprintf("%d", i)] = noHit
podTemplate := spec.Template.DeepCopy()

if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
}

for _, pod := range activePods {
if _, ok := pod.Labels[tfReplicaIndexLabel]; !ok {
continue
}
for key, value := range labels {
podTemplate.Labels[key] = value
}

index := pod.Labels[tfReplicaIndexLabel]
indexNum, err := strconv.Atoi(index)
if err != nil {
logger.Warningf("The label index should be integer: %s", index)
} else {
// The situation should not happen.
if indexNum < 0 || indexNum >= int(replicas) {
logger.Warningf("The label index is not expected: %d", indexNum)
}
}
// Generate TF_CONFIG JSON string.
tfConfigStr := genTFConfigJSONStr(tfjob, rt, index)

if _, ok := desiredIndexes[index]; ok {
desiredIndexes[index] = hit
if tfConfigStr == "" {
return nil
}
// Add TF_CONFIG environment variable.
for i := range podTemplate.Spec.Containers {
if len(podTemplate.Spec.Containers[i].Env) == 0 {
podTemplate.Spec.Containers[i].Env = make([]v1.EnvVar, 0)
}
podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, v1.EnvVar{
Name: "TF_CONFIG",
Value: tfConfigStr,
})
}

diffIndexes := []string{}
for index, hit := range desiredIndexes {
if hit == noHit {
diffIndexes = append(diffIndexes, index)
}
// TODO(gaocegege): Deal with RestartPolicyExitCode.
// Set restart policy
if spec.RestartPolicy != tfv1alpha2.RestartPolicyExitCode {
podTemplate.Spec.RestartPolicy = v1.RestartPolicy(spec.RestartPolicy)
}

return diffIndexes
err = tc.podControl.CreatePodsWithControllerRef(tfjob.Namespace, podTemplate, tfjob, controllerRef)
if err != nil && errors.IsTimeout(err) {
// Pod is created but its initialization has timed out.
// If the initialization is successful eventually, the
// controller will observe the creation via the informer.
// If the initialization fails, or if the pod keeps
// uninitialized for a long time, the informer will not
// receive any update, and the controller will create a new
// pod when the expectation expires.
return nil
} else if err != nil {
return err
}
return nil
}

// getPodsForTFJob returns the set of pods that this tfjob should manage.
Expand Down Expand Up @@ -339,7 +239,7 @@ func (tc *TFJobController) addPod(obj interface{}) {
if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
tfjob := tc.resolveControllerRef(pod.Namespace, controllerRef)
if tfjob == nil {
loggerForTFJob(tfjob).Info("This pod's tfjob does not exists")
log.Info("This pod's tfjob does not exists")
return
}

Expand Down Expand Up @@ -384,21 +284,3 @@ func (tc *TFJobController) updatePod(old, cur interface{}) {
func (tc *TFJobController) deletePod(obj interface{}) {
// TODO(CPH): handle this gracefully.
}

// getPodStatus returns no of succeeded and failed pods running a job
func getPodStatus(pods []*v1.Pod) (succeeded, failed int32) {
succeeded = int32(filterPods(pods, v1.PodSucceeded))
failed = int32(filterPods(pods, v1.PodFailed))
return
}

// filterPods returns pods based on their phase.
func filterPods(pods []*v1.Pod, phase v1.PodPhase) int {
result := 0
for i := range pods {
if phase == pods[i].Status.Phase {
result++
}
}
return result
}
Loading