Skip to content

Commit

Permalink
[Prism] Connect Web UI cancel requests with backend (#31028)
Browse files Browse the repository at this point in the history
* Connect Web UI requests with backend

* Remove artificial setting of cancelled state

* Only lock when acquiring job
  • Loading branch information
damondouglas committed May 2, 2024
1 parent 1418030 commit ac83276
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 26 deletions.
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func RunPipeline(j *jobservices.Job) {
j.SendMsg("running " + j.String())
j.Running()

if err := executePipeline(j.RootCtx, wks, j); err != nil {
if err := executePipeline(j.RootCtx, wks, j); err != nil && !errors.Is(err, jobservices.ErrCancel) {
j.Failed(err)
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ func TestServer(t *testing.T) {

cmpOpts := []cmp.Option{protocmp.Transform(), cmpopts.EquateEmpty()}
tests := []struct {
name string
name string
postRunState jobpb.JobState_Enum
// noJobsCheck tests in the setting that the Job doesn't exist
// postPrepCheck tests after Server Prepare invoked
noJobsCheck, postPrepCheck func(context.Context, *testing.T, *Server)
postRunCheck func(context.Context, *testing.T, *Server, string)
// postRunCheck tests after Server Run invoked
postRunCheck func(context.Context, *testing.T, *Server, string)
}{
{
name: "GetJobs",
Expand Down Expand Up @@ -170,36 +174,38 @@ func TestServer(t *testing.T) {
},
},
{
name: "Canceling",
name: "Canceling",
postRunState: jobpb.JobState_RUNNING,
noJobsCheck: func(ctx context.Context, t *testing.T, undertest *Server) {
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: "job-001"})
if resp != nil {
t.Errorf("Canceling(\"job-001\") = %s, want nil", resp)
}
id := "job-001"
_, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id})
// Cancel currently returns nil, nil when Job not found
if err != nil {
t.Errorf("Canceling(\"job-001\") = %v, want nil", err)
t.Errorf("Cancel(%q) = %v, want not found error", id, err)
}
},
postPrepCheck: func(ctx context.Context, t *testing.T, undertest *Server) {
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: "job-001"})
id := "job-001"
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id})
if err != nil {
t.Errorf("Canceling(\"job-001\") = %v, want nil", err)
t.Errorf("Cancel(%q) = %v, want not found error", id, err)
}
if diff := cmp.Diff(&jobpb.CancelJobResponse{
State: jobpb.JobState_CANCELLING,
}, resp, cmpOpts...); diff != "" {
t.Errorf("Canceling(\"job-001\") (-want, +got):\n%v", diff)
t.Errorf("Cancel(%q) (-want, +got):\n%s", id, diff)
}
},
postRunCheck: func(ctx context.Context, t *testing.T, undertest *Server, jobID string) {
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: jobID})
id := "job-001"
resp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{JobId: id})
if err != nil {
t.Errorf("Canceling(\"%s\") = %v, want nil", jobID, err)
t.Errorf("Cancel(%q) = %v, want not found error", id, err)
}
if diff := cmp.Diff(&jobpb.CancelJobResponse{
State: jobpb.JobState_DONE,
State: jobpb.JobState_CANCELLING,
}, resp, cmpOpts...); diff != "" {
t.Errorf("Canceling(\"%s\") (-want, +got):\n%v", jobID, diff)
t.Errorf("Cancel(%q) (-want, +got):\n%s", id, diff)
}
},
},
Expand Down Expand Up @@ -230,7 +236,11 @@ func TestServer(t *testing.T) {
shortIDSize: sizeData,
},
})
j.state.Store(jobpb.JobState_DONE)
state := jobpb.JobState_DONE
if test.postRunState != jobpb.JobState_UNSPECIFIED {
state = test.postRunState
}
j.state.Store(state)
called.Done()
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ func TestServer_RunThenCancel(t *testing.T) {
var called sync.WaitGroup
called.Add(1)
undertest := NewServer(0, func(j *Job) {
defer called.Done()
j.state.Store(jobpb.JobState_RUNNING)
if errors.Is(context.Cause(j.RootCtx), ErrCancel) {
j.state.Store(jobpb.JobState_CANCELLED)
called.Done()
j.SendMsg("pipeline canceled " + j.String())
j.Canceled()
return
}
})
ctx := context.Background()
Expand Down Expand Up @@ -121,6 +124,7 @@ func TestServer_RunThenCancel(t *testing.T) {
cancelResp, err := undertest.Cancel(ctx, &jobpb.CancelJobRequest{
JobId: runResp.GetJobId(),
})

if err != nil {
t.Fatalf("server.Canceling() = %v, want nil", err)
}
Expand Down
120 changes: 117 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/web/assets/job-action.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
limitations under the License.
*/

/**
* job-action.js provides UI functionality for taking actions on Prism Jobs via:
* - jobManager: Client for Job Management.
* - uiStateProvider: Encapsulates UI state of user interactive elements on the page.
*/

/** Element class for job action container. */
const JOB_ACTION = '.job-action'

Expand All @@ -21,6 +27,24 @@ const CANCEL = '.cancel'
/** Element class assigned to RUNNING Job state. */
const RUNNING = 'RUNNING'

/** Element class for elements reporting Job state details. */
const JOB_STATE = '.job-state'

/** PATH holds consts that map to backend endpoints. */
const PATH = {

/** ROOT_ is the Job management path prefix for mapped backend endpoints. */
ROOT_: '/job',

/** CANCEL maps to the backend endpoint to cancel a Job. Terminates with '/' to prevent ServeMux 301 redirect. */
get CANCEL() {
return `${this.ROOT_}/cancel/`
}
}

/** HTTP related consts. */
const HTTP_POST = 'POST'

/**
* Client for Job Management.
*
Expand All @@ -34,9 +58,28 @@ const jobManager = {
* @param jobId
* TODO(https://github.com/apache/beam/issues/29669) Send request to backend service.
*/
cancel: function(jobId) {
cancel: function (jobId) {
console.debug(`cancel button for Job: ${jobId} clicked`)
}
const path = PATH.CANCEL
const request = {
method: HTTP_POST,
body: JSON.stringify(new CancelJobRequest(jobId))
}
fetch(path, request)
.then(response => {
const requestJson = JSON.stringify(request)
const responseJson = JSON.stringify(response)
if (response.ok) {
console.debug(`Job cancellation request to ${path} of ${requestJson} for Job: ${jobId} sent successfully, response: ${responseJson}`)
uiStateProvider.onJobCancel(response)
} else {
console.error(`Failed to send job cancellation request to ${path} of ${requestJson} for Job: ${jobId}, response: ${responseJson}`)
}
})
.catch(error => {
console.error(`Error occurred while sending job cancellation request for Job: ${jobId}`, error)
})
},
}

/**
Expand Down Expand Up @@ -84,14 +127,85 @@ const uiStateProvider = {
*/
get isStateRunning() {
return this.jobAction.classList.contains(RUNNING)
},

/**
* Queries the element containing the {@link JOB_STATE} class.
* @return {Element}
*/
get jobStateElement() {
let element = document.querySelector(JOB_STATE)
if (element === null) {
console.error(`no element found at ${JOB_STATE}`)
}
return element
},

/**
* Callback for successful Job Cancel requests.
* @param response {Response}
*/
onJobCancel(response) {
response.json().then(json => {
console.debug(`job cancel response json: ${JSON.stringify(json)}`)
uiStateProvider.jobStateElement.textContent = JobState_Enum[json.state]
})
.catch(error => {
console.error(`error Response.json() ${error}`)
})
}
}

/**
* Attaches an event listener to the window for 'load' events.
*/
window.addEventListener("load", function(){
window.addEventListener("load", function () {
console.debug(JOB_ACTION, uiStateProvider.jobAction)
console.debug(CANCEL, uiStateProvider.cancelButton)
uiStateProvider.init()
})

/**
* CancelJobRequest models a request to cancel a Job.
*
* Models after its proto namesake in:
* https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
*/
class CancelJobRequest {
job_id_;

constructor(jobId) {
this.job_id_ = jobId
}

/**
* The ID of the Job to cancel.
* @return {string}
*/
get job_id() {
return this.job_id_
}

/** toJSON overrides JSON.stringify serialization behavior. */
toJSON() {
return {job_id: this.job_id}
}
}

/** Maps JobState_Enum from Job Management server response to the Job State name. See proto for more details:
* https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/org/apache/beam/model/job_management/v1/beam_job_api.proto
*/
const JobState_Enum = {
0: "UNSPECIFIED",
1: "STOPPED",
2: "RUNNING",
3: "DONE",
4: "FAILED",
5: "CANCELLED",
6: "UPDATED",
7: "DRAINING",
8: "DRAINED",
9: "STARTING",
10: "CANCELLING",
11: "UPDATING",
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
onclick="if (jobManager !== null) { jobManager.cancel('{{.JobID}}') }"
>Cancel</button>
</div>
<div>{{.State}}</div>
<div class="job-state">{{.State}}</div>
</header>
<section class="container">
{{ if .Error}}<div class="child">{{.Error}}</div>{{end}}
Expand Down
21 changes: 18 additions & 3 deletions sdks/go/pkg/beam/runners/prism/internal/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"html/template"
"io"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -374,8 +375,22 @@ type jobCancelHandler struct {

func (h *jobCancelHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var cancelRequest *jobpb.CancelJobRequest
if err := json.NewDecoder(r.Body).Decode(&cancelRequest); err != nil {
err = fmt.Errorf("error parsing JSON of request: %w", err)
if r.Method != http.MethodPost {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}
body, err := io.ReadAll(r.Body)
if err != nil {
err = fmt.Errorf("could not read request body: %w", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if len(body) == 0 {
http.Error(w, "empty request body", http.StatusBadRequest)
return
}
if err := json.Unmarshal(body, &cancelRequest); err != nil {
err = fmt.Errorf("error parsing JSON: %s of request: %w", body, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand Down Expand Up @@ -405,10 +420,10 @@ func Initialize(ctx context.Context, port int, jobcli jobpb.JobServiceClient) er
mux := http.NewServeMux()

mux.Handle("/assets/", assetsFs)
mux.Handle("/job/cancel/", &jobCancelHandler{Jobcli: jobcli})
mux.Handle("/job/", &jobDetailsHandler{Jobcli: jobcli})
mux.Handle("/debugz", &debugzHandler{})
mux.Handle("/", &jobsConsoleHandler{Jobcli: jobcli})
mux.Handle("/job/cancel", &jobCancelHandler{Jobcli: jobcli})

endpoint := fmt.Sprintf("localhost:%d", port)

Expand Down

0 comments on commit ac83276

Please sign in to comment.