Skip to content

Commit 8b47826

Browse files
authored
Revert "Feature/cron scheduling rayjob 2426 (#3836)" (#3911)
This reverts commit f6b4f17.
1 parent fbdf317 commit 8b47826

File tree

18 files changed

+0
-1198
lines changed

18 files changed

+0
-1198
lines changed

docs/reference/api.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ _Appears in:_
248248
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
249249
| `submissionMode` _[JobSubmissionMode](#jobsubmissionmode)_ | SubmissionMode specifies how RayJob submits the Ray job to the RayCluster.<br />In "K8sJobMode", the KubeRay operator creates a submitter Kubernetes Job to submit the Ray job.<br />In "HTTPMode", the KubeRay operator sends a request to the RayCluster to create a Ray job.<br />In "InteractiveMode", the KubeRay operator waits for a user to submit a job to the Ray cluster. | K8sJobMode | |
250250
| `entrypointResources` _string_ | EntrypointResources specifies the custom resources and quantities to reserve for the<br />entrypoint command. | | |
251-
| `schedule` _string_ | Schedule specifies a cron like string for scheduling Ray jobs.<br />When shutdownAfterJobFinishes is set to true, a new cluster is provisioned<br />per scheduled job, otherwise the job is scheduled on an existing cluster. | | |
252251
| `entrypointNumCpus` _float_ | EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command. | | |
253252
| `entrypointNumGpus` _float_ | EntrypointNumGpus specifies the number of gpus to reserve for the entrypoint command. | | |
254253
| `ttlSecondsAfterFinished` _integer_ | TTLSecondsAfterFinished is the TTL to clean up RayCluster.<br />It's only working when ShutdownAfterJobFinishes set to true. | 0 | |

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ require (
8787
github.com/prometheus/client_model v0.6.1 // indirect
8888
github.com/prometheus/common v0.62.0 // indirect
8989
github.com/prometheus/procfs v0.15.1 // indirect
90-
github.com/robfig/cron/v3 v3.0.1 // indirect
9190
github.com/russross/blackfriday/v2 v2.1.0 // indirect
9291
github.com/x448/float16 v0.8.4 // indirect
9392
github.com/xlab/treeprint v1.2.0 // indirect

go.sum

Lines changed: 0 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/apis/ray/v1/rayjob_types.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ const (
5454
JobDeploymentStatusSuspended JobDeploymentStatus = "Suspended"
5555
JobDeploymentStatusRetrying JobDeploymentStatus = "Retrying"
5656
JobDeploymentStatusWaiting JobDeploymentStatus = "Waiting"
57-
JobDeploymentStatusScheduling JobDeploymentStatus = "Scheduling"
58-
JobDeploymentStatusScheduled JobDeploymentStatus = "Scheduled"
5957
)
6058

6159
// IsJobDeploymentTerminal returns true if the given JobDeploymentStatus
@@ -183,11 +181,6 @@ type RayJobSpec struct {
183181
// entrypoint command.
184182
// +optional
185183
EntrypointResources string `json:"entrypointResources,omitempty"`
186-
// Schedule specifies a cron like string for scheduling Ray jobs.
187-
// When shutdownAfterJobFinishes is set to true, a new cluster is provisioned
188-
// per scheduled job, otherwise the job is scheduled on an existing cluster.
189-
// +optional
190-
Schedule string `json:"schedule,omitempty"`
191184
// EntrypointNumCpus specifies the number of cpus to reserve for the entrypoint command.
192185
// +optional
193186
EntrypointNumCpus float32 `json:"entrypointNumCpus,omitempty"`
@@ -240,9 +233,6 @@ type RayJobStatus struct {
240233
// or the submitter Job has failed.
241234
// +optional
242235
EndTime *metav1.Time `json:"endTime,omitempty"`
243-
// lastScheduledTime is the last time the job was successfully scheduled.
244-
// +optional
245-
LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
246236
// Succeeded is the number of times this job succeeded.
247237
// +kubebuilder:default:=0
248238
// +optional

ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Lines changed: 0 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Lines changed: 0 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ray-operator/config/samples/ray-job.schedule.yaml

Lines changed: 0 additions & 121 deletions
This file was deleted.

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 0 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"time"
1010

1111
"github.com/go-logr/logr"
12-
"github.com/robfig/cron/v3"
1312
batchv1 "k8s.io/api/batch/v1"
1413
corev1 "k8s.io/api/core/v1"
1514
"k8s.io/apimachinery/pkg/api/errors"
@@ -35,8 +34,6 @@ const (
3534
RayJobDefaultRequeueDuration = 3 * time.Second
3635
RayJobDefaultClusterSelectorKey = "ray.io/cluster"
3736
PythonUnbufferedEnvVarName = "PYTHONUNBUFFERED"
38-
// The buffer period in which a scheduled rajob can run since the last cron tick
39-
ScheduleBuffer = 100 * time.Millisecond
4037
)
4138

4239
// RayJobReconciler reconciles a RayJob object
@@ -171,11 +168,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
171168
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
172169
}
173170
}
174-
// We check the LastScheduleTime to know if its the first job
175-
if rayJobInstance.Spec.Schedule != "" && rayJobInstance.Status.LastScheduleTime == nil {
176-
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
177-
break
178-
}
179171
// Set `Status.JobDeploymentStatus` to `JobDeploymentStatusInitializing`, and initialize `Status.JobId`
180172
// and `Status.RayClusterName` prior to avoid duplicate job submissions and cluster creations.
181173
logger.Info("JobDeploymentStatusNew")
@@ -457,58 +449,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
457449
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
458450
}
459451
}
460-
if rayJobInstance.Spec.Schedule != "" {
461-
logger.Info("Rescheduling RayJob")
462-
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduling
463-
break
464-
}
465452

466453
// If the RayJob is completed, we should not requeue it.
467454
return ctrl.Result{}, nil
468-
case rayv1.JobDeploymentStatusScheduling:
469-
isJobDeleted, err := r.deleteSubmitterJob(ctx, rayJobInstance)
470-
if err != nil {
471-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
472-
}
473-
474-
if !isJobDeleted {
475-
logger.Info("The release of the compute resources has not been completed yet. " +
476-
"Wait for the resources to be deleted before the status transitions to avoid a resource leak.")
477-
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
478-
}
479-
480-
if rayJobInstance.Spec.ShutdownAfterJobFinishes {
481-
rayJobInstance.Status.RayClusterStatus = rayv1.RayClusterStatus{}
482-
rayJobInstance.Status.RayClusterName = ""
483-
484-
}
485-
rayJobInstance.Status.DashboardURL = ""
486-
rayJobInstance.Status.JobId = ""
487-
rayJobInstance.Status.Message = ""
488-
rayJobInstance.Status.Reason = ""
489-
rayJobInstance.Status.RayJobStatusInfo = rayv1.RayJobStatusInfo{}
490-
491-
rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
492-
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusScheduled
493-
case rayv1.JobDeploymentStatusScheduled:
494-
// We get the time from the current time to the previous and next cron schedule times
495-
// We pass in time.Now() as a parameter so easier unit testing and consistency
496-
t1, t2, err := r.getNextAndPreviousScheduleDistance(ctx, time.Now(), rayJobInstance)
497-
if err != nil {
498-
logger.Error(err, "Could not get the previous and next distances for a cron schedule")
499-
return ctrl.Result{}, err
500-
}
501-
// Checking if we are currently within a buffer to the previous cron schedule time
502-
if t2 <= ScheduleBuffer {
503-
logger.Info("The current time is within the buffer window of a cron tick", "NextScheduleTimeDuration", t1, "LastScheduleTimeDuration", t2, "Previous LastScheduleTime", rayJobInstance.Status.LastScheduleTime)
504-
rayJobInstance.Status.LastScheduleTime = &metav1.Time{Time: time.Now()}
505-
rayJobInstance.Status.JobStatus = rayv1.JobStatusNew
506-
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusNew
507-
} else {
508-
logger.Info("Waiting until the next reconcile to determine schedule", "nextScheduleDuration", t1, "currentTime", time.Now(), "lastScheduleTimeDuration", t2)
509-
return ctrl.Result{RequeueAfter: t1}, nil
510-
}
511-
512455
default:
513456
logger.Info("Unknown JobDeploymentStatus", "JobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus)
514457
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil
@@ -951,23 +894,6 @@ func (r *RayJobReconciler) constructRayClusterForRayJob(rayJobInstance *rayv1.Ra
951894
return rayCluster, nil
952895
}
953896

954-
func (r *RayJobReconciler) getNextAndPreviousScheduleDistance(ctx context.Context, currentTime time.Time, rayJobInstance *rayv1.RayJob) (time.Duration, time.Duration, error) {
955-
logger := ctrl.LoggerFrom(ctx)
956-
formatedCron := utils.FormatSchedule(rayJobInstance, r.Recorder)
957-
cronSchedule, err := cron.ParseStandard(formatedCron)
958-
if err != nil {
959-
// this is likely a user error in defining the spec value
960-
// we should log the error and not reconcile this cronjob until an update to spec
961-
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, "UnparseableSchedule", "unparseable schedule: %q : %s", rayJobInstance.Spec.Schedule, err)
962-
return 0, 0, fmt.Errorf("the cron schedule provided is unparseable: %w", err)
963-
}
964-
965-
t1 := utils.NextScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
966-
t2 := utils.LastScheduleTimeDuration(logger, rayJobInstance, currentTime, cronSchedule)
967-
968-
return t1, t2, nil
969-
}
970-
971897
func updateStatusToSuspendingIfNeeded(ctx context.Context, rayJob *rayv1.RayJob) bool {
972898
logger := ctrl.LoggerFrom(ctx)
973899
if !rayJob.Spec.Suspend {

0 commit comments

Comments
 (0)