-
Notifications
You must be signed in to change notification settings - Fork 486
Backend for getting logs of a trial #2039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
64c5e64
e9b4daa
77cbbb0
174acb8
ef89c9b
b934379
0c017ff
3d1460a
55ba2ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,8 +17,11 @@ limitations under the License. | |
package v1beta1 | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"encoding/json" | ||
"errors" | ||
"io" | ||
"log" | ||
"net/http" | ||
"path/filepath" | ||
|
@@ -29,10 +32,19 @@ import ( | |
|
||
experimentv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1" | ||
suggestionv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1" | ||
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" | ||
api_pb_v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" | ||
consts "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" | ||
"github.com/kubeflow/katib/pkg/util/v1beta1/katibclient" | ||
corev1 "k8s.io/api/core/v1" | ||
|
||
common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" | ||
mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common" | ||
apiv1 "k8s.io/api/core/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/client-go/kubernetes" | ||
"sigs.k8s.io/controller-runtime/pkg/client/config" | ||
) | ||
|
||
func NewKatibUIHandler(dbManagerAddr string) *KatibUIHandler { | ||
|
@@ -574,3 +586,149 @@ func (k *KatibUIHandler) FetchTrial(w http.ResponseWriter, r *http.Request) { | |
return | ||
} | ||
} | ||
|
||
// FetchTrialLogs fetches logs for a trial in specific namespace. | ||
func (k *KatibUIHandler) FetchTrialLogs(w http.ResponseWriter, r *http.Request) { | ||
namespaces, ok := r.URL.Query()["namespace"] | ||
if !ok { | ||
log.Printf("No namespace provided in Query parameters! Provide a 'namespace' param") | ||
err := errors.New("no 'namespace' provided") | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
return | ||
} | ||
|
||
trialNames, ok := r.URL.Query()["trialName"] | ||
if !ok { | ||
log.Printf("No trialName provided in Query parameters! Provide a 'trialName' param") | ||
err := errors.New("no 'trialName' provided") | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
return | ||
} | ||
|
||
trialName := trialNames[0] | ||
namespace := namespaces[0] | ||
|
||
user, err := IsAuthorized(consts.ActionTypeGet, namespace, consts.PluralTrial, "", trialName, trialsv1beta1.SchemeGroupVersion, k.katibClient.GetClient(), r) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Slightly orthogonal to the PR, but I think the function signature Why not have a distinct function for getting the current user and do this check once? Then Or at least for this PR, we could only check if the returned user is not "" only once, the first time we call this function There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agree, we are checking twice the user. I think we can proceed fixing the auth after we merge this PR? Then we can have a separate PR to improve the authentication in the entire file. Or we can do the other way around? |
||
if user == "" && err != nil { | ||
log.Printf("No user provided in kubeflow-userid header.") | ||
http.Error(w, err.Error(), http.StatusUnauthorized) | ||
return | ||
} else if err != nil { | ||
log.Printf("The user: %s is not authorized to get trial: %s in namespace: %s \n", user, trialName, namespace) | ||
http.Error(w, err.Error(), http.StatusForbidden) | ||
return | ||
} | ||
|
||
trial := &trialsv1beta1.Trial{} | ||
if err := k.katibClient.GetClient().Get(context.Background(), types.NamespacedName{Name: trialName, Namespace: namespace}, trial); err != nil { | ||
log.Printf("GetLogs failed: %v", err) | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
// TODO: Use controller-runtime client instead of kubernetes client to get logs, once this is available | ||
clientset, err := createKubernetesClientset() | ||
if err != nil { | ||
log.Printf("GetLogs failed: %v", err) | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
podName, err := fetchMasterPodName(clientset, trial) | ||
if err != nil { | ||
log.Printf("GetLogs failed: %v", err) | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
user, err = IsAuthorized(consts.ActionTypeGet, namespace, corev1.ResourcePods.String(), "log", podName, corev1.SchemeGroupVersion, k.katibClient.GetClient(), r) | ||
if user == "" && err != nil { | ||
log.Printf("No user provided in kubeflow-userid header.") | ||
http.Error(w, err.Error(), http.StatusUnauthorized) | ||
return | ||
} else if err != nil { | ||
log.Printf("The user: %s is not authorized to get pod logs: %s in namespace: %s \n", user, podName, namespace) | ||
http.Error(w, err.Error(), http.StatusForbidden) | ||
return | ||
} | ||
|
||
podLogOpts := apiv1.PodLogOptions{} | ||
podLogOpts.Container = trial.Spec.PrimaryContainerName | ||
if trial.Spec.MetricsCollector.Collector.Kind == common.StdOutCollector { | ||
podLogOpts.Container = mccommon.MetricLoggerCollectorContainerName | ||
} | ||
|
||
logs, err := fetchPodLogs(clientset, namespace, podName, podLogOpts) | ||
if err != nil { | ||
log.Printf("GetLogs failed: %v", err) | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
response, err := json.Marshal(logs) | ||
if err != nil { | ||
log.Printf("Marshal logs failed: %v", err) | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
if _, err = w.Write(response); err != nil { | ||
log.Printf("Write logs failed: %v", err) | ||
http.Error(w, err.Error(), http.StatusInternalServerError) | ||
return | ||
} | ||
d-gol marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+668
to
+677
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question 2: Would we expect in the future to return logs from other worker pods? If that's the case I'd propose that the backend actually returns a JSON type response like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good idea, if we want to change in the future. @johnugeorge @andreyvelich what do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like @kimwnasptd idea, let's add Primary Pod Label to the JSON response. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hey @andreyvelich did you mean the result to be in the form:
or something else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. master is bit confusing term. Eg: There can be job with just workers where worker0 acts as the master. If we really need to add pod info, pod name might be better. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @d-gol It's not always "master" label for the pod that we get labels. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am ok with creating separate issue to track improvements for this API response (e.g. add trial name to the response). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. +1 to merge There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree to merge it, and later we can improve the API response with more information if needed. So again, to clarify, we want to merge this PR with a simple string response (current implementation)? Or in the form of json, like below?
|
||
} | ||
|
||
// createKubernetesClientset returns kubernetes clientset | ||
func createKubernetesClientset() (*kubernetes.Clientset, error) { | ||
cfg, err := config.GetConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
clientset, err := kubernetes.NewForConfig(cfg) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return clientset, nil | ||
} | ||
|
||
// fetchMasterPodName returns name of the master pod for a trial | ||
func fetchMasterPodName(clientset *kubernetes.Clientset, trial *trialsv1beta1.Trial) (string, error) { | ||
selectionLabel := consts.LabelTrialName + "=" + trial.ObjectMeta.Name | ||
for primaryKey, primaryValue := range trial.Spec.PrimaryPodLabels { | ||
selectionLabel = selectionLabel + "," + primaryKey + "=" + primaryValue | ||
} | ||
|
||
podList, err := clientset.CoreV1().Pods(trial.ObjectMeta.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selectionLabel}) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
if len(podList.Items) == 0 { | ||
return "", errors.New(`Logs for the trial could not be found. | ||
Was 'retain: true' specified in the Experiment definition? | ||
An example can be found here: https://github.com/kubeflow/katib/blob/7bf39225f7235ee4ba6cf285ecc2c455c6471234/examples/v1beta1/argo/argo-workflow.yaml#L33`) | ||
} | ||
if len(podList.Items) > 1 { | ||
return "", errors.New("More than one master replica found") | ||
} | ||
|
||
return podList.Items[0].Name, nil | ||
} | ||
|
||
// fetchPodLogs returns logs of a pod for the given job name and namespace | ||
func fetchPodLogs(clientset *kubernetes.Clientset, namespace string, podName string, podLogOpts apiv1.PodLogOptions) (string, error) { | ||
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts) | ||
podLogs, err := req.Stream(context.Background()) | ||
if err != nil { | ||
return "", err | ||
} | ||
defer podLogs.Close() | ||
|
||
buf := new(bytes.Buffer) | ||
_, err = io.Copy(buf, podLogs) | ||
if err != nil { | ||
return "", err | ||
} | ||
str := buf.String() | ||
|
||
return str, nil | ||
} |
Uh oh!
There was an error while loading. Please reload this page.