Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backend for getting logs of a trial #2039

Merged
merged 9 commits into from
Dec 24, 2022
1 change: 1 addition & 0 deletions cmd/new-ui/v1beta1/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func main() {
http.HandleFunc("/katib/edit_template/", kuh.EditTemplate)
http.HandleFunc("/katib/delete_template/", kuh.DeleteTemplate)
http.HandleFunc("/katib/fetch_namespaces", kuh.FetchNamespaces)
http.HandleFunc("/katib/fetch_trial_logs/", kuh.FetchTrialLogs)

log.Printf("Serving at %s:%s", *host, *port)
if err := http.ListenAndServe(fmt.Sprintf("%s:%s", *host, *port), nil); err != nil {
Expand Down
12 changes: 12 additions & 0 deletions manifests/v1beta1/components/ui/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ rules:
- suggestions
verbs:
- "*"
- apiGroups:
- ""
resources:
- pods
verbs:
- list
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
d-gol marked this conversation as resolved.
Show resolved Hide resolved
---
apiVersion: v1
kind: ServiceAccount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,18 @@ rules:
- deletecollection
- patch
- update
- apiGroups:
- ""
resources:
- pods
verbs:
- list
- apiGroups:
- ""
resources:
- pods/log
verbs:
- get
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
Expand Down
158 changes: 158 additions & 0 deletions pkg/new-ui/v1beta1/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package v1beta1

import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"log"
"net/http"
"path/filepath"
Expand All @@ -29,10 +32,19 @@ import (

experimentv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1beta1"
suggestionv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1"
trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1"
api_pb_v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1"
consts "github.com/kubeflow/katib/pkg/controller.v1beta1/consts"
"github.com/kubeflow/katib/pkg/util/v1beta1/katibclient"
corev1 "k8s.io/api/core/v1"

common "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1"
mccommon "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
)

func NewKatibUIHandler(dbManagerAddr string) *KatibUIHandler {
Expand Down Expand Up @@ -574,3 +586,149 @@ func (k *KatibUIHandler) FetchTrial(w http.ResponseWriter, r *http.Request) {
return
}
}

// FetchTrialLogs fetches logs for a trial in specific namespace.
func (k *KatibUIHandler) FetchTrialLogs(w http.ResponseWriter, r *http.Request) {
namespaces, ok := r.URL.Query()["namespace"]
if !ok {
log.Printf("No namespace provided in Query parameters! Provide a 'namespace' param")
err := errors.New("no 'namespace' provided")
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

trialNames, ok := r.URL.Query()["trialName"]
if !ok {
log.Printf("No trialName provided in Query parameters! Provide a 'trialName' param")
err := errors.New("no 'trialName' provided")
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

trialName := trialNames[0]
namespace := namespaces[0]

user, err := IsAuthorized(consts.ActionTypeGet, namespace, consts.PluralTrial, "", trialName, trialsv1beta1.SchemeGroupVersion, k.katibClient.GetClient(), r)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly orthogonal to the PR, but I think the function signature user, err = IsAuthorized(...) is not ideal. We end up getting the same information and do duplicate checks on errors depending on the user value.

Why not have a distinct function for getting the current user and do this check once? Then IsAuthorized(user, ...) will only be responsible for the SubjectAccessReviews check.

Or at least for this PR, we could only check if the returned user is not "" only once, the first time we call this function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, we are checking twice the user. I think we can proceed fixing the auth after we merge this PR? Then we can have a separate PR to improve the authentication in the entire file. Or we can do the other way around?

if user == "" && err != nil {
log.Printf("No user provided in kubeflow-userid header.")
http.Error(w, err.Error(), http.StatusUnauthorized)
return
} else if err != nil {
log.Printf("The user: %s is not authorized to get trial: %s in namespace: %s \n", user, trialName, namespace)
http.Error(w, err.Error(), http.StatusForbidden)
return
}

trial := &trialsv1beta1.Trial{}
if err := k.katibClient.GetClient().Get(context.Background(), types.NamespacedName{Name: trialName, Namespace: namespace}, trial); err != nil {
log.Printf("GetLogs failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// TODO: Use controller-runtime client instead of kubernetes client to get logs, once this is available
clientset, err := createKubernetesClientset()
if err != nil {
log.Printf("GetLogs failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

podName, err := fetchMasterPodName(clientset, trial)
if err != nil {
log.Printf("GetLogs failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

user, err = IsAuthorized(consts.ActionTypeGet, namespace, corev1.ResourcePods.String(), "log", podName, corev1.SchemeGroupVersion, k.katibClient.GetClient(), r)
if user == "" && err != nil {
log.Printf("No user provided in kubeflow-userid header.")
http.Error(w, err.Error(), http.StatusUnauthorized)
return
} else if err != nil {
log.Printf("The user: %s is not authorized to get pod logs: %s in namespace: %s \n", user, podName, namespace)
http.Error(w, err.Error(), http.StatusForbidden)
return
}

podLogOpts := apiv1.PodLogOptions{}
podLogOpts.Container = trial.Spec.PrimaryContainerName
if trial.Spec.MetricsCollector.Collector.Kind == common.StdOutCollector {
podLogOpts.Container = mccommon.MetricLoggerCollectorContainerName
}

logs, err := fetchPodLogs(clientset, namespace, podName, podLogOpts)
if err != nil {
log.Printf("GetLogs failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
response, err := json.Marshal(logs)
if err != nil {
log.Printf("Marshal logs failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err = w.Write(response); err != nil {
log.Printf("Write logs failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
d-gol marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +668 to +677
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question 2: Would we expect in the future to return logs from other worker pods?

If that's the case I'd propose that the backend actually returns a JSON type response like

logs: {
    master: "..."
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea, if we want to change in the future. @johnugeorge @andreyvelich what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like @kimwnasptd idea, let's add Primary Pod Label to the JSON response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @andreyvelich did you mean the result to be in the form:

logs: {
    master: "..."
}

or something else?
We can have multiple primary pod labels, did you mean to also add them as key value pairs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

master is bit confusing term. Eg: There can be job with just workers where worker0 acts as the master. If we really need to add pod info, pod name might be better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@d-gol It's not always "master" label for the pod that we get labels.
For example, for Argo we get pod with katib.kubeflow.org/model-training: true label.
Maybe pod name to include in the response make sense as @johnugeorge mentioned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am ok with creating separate issue to track improvements for this API response (e.g. add trial name to the response).
So we can merge this PR and unblock UI team to start working on the UI changes to have this feature in the next release.
What do you think @d-gol @kimwnasptd @johnugeorge ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. +1 to merge

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree to merge it, and later we can improve the API response with more information if needed. So again, to clarify, we want to merge this PR with a simple string response (current implementation)? Or in the form of json, like below?

{
    "pod_name": logs
}

}

// createKubernetesClientset returns kubernetes clientset
func createKubernetesClientset() (*kubernetes.Clientset, error) {
cfg, err := config.GetConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
return clientset, nil
}

// fetchMasterPodName returns name of the master pod for a trial
func fetchMasterPodName(clientset *kubernetes.Clientset, trial *trialsv1beta1.Trial) (string, error) {
selectionLabel := consts.LabelTrialName + "=" + trial.ObjectMeta.Name
for primaryKey, primaryValue := range trial.Spec.PrimaryPodLabels {
selectionLabel = selectionLabel + "," + primaryKey + "=" + primaryValue
}

podList, err := clientset.CoreV1().Pods(trial.ObjectMeta.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: selectionLabel})
if err != nil {
return "", err
}

if len(podList.Items) == 0 {
return "", errors.New(`Logs for the trial could not be found.
Was 'retain: true' specified in the Experiment definition?
An example can be found here: https://github.com/kubeflow/katib/blob/7bf39225f7235ee4ba6cf285ecc2c455c6471234/examples/v1beta1/argo/argo-workflow.yaml#L33`)
}
if len(podList.Items) > 1 {
return "", errors.New("More than one master replica found")
}

return podList.Items[0].Name, nil
}

// fetchPodLogs returns logs of a pod for the given job name and namespace
func fetchPodLogs(clientset *kubernetes.Clientset, namespace string, podName string, podLogOpts apiv1.PodLogOptions) (string, error) {
req := clientset.CoreV1().Pods(namespace).GetLogs(podName, &podLogOpts)
podLogs, err := req.Stream(context.Background())
if err != nil {
return "", err
}
defer podLogs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
if err != nil {
return "", err
}
str := buf.String()

return str, nil
}