Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

feat: Download and report WES run log files #319

Merged
merged 4 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion packages/cli/internal/pkg/cli/logs_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package cli

import (
"bufio"
"bytes"
"errors"
"fmt"
Expand Down Expand Up @@ -104,11 +105,58 @@ func (o *logsWorkflowOpts) Execute() error {
}
} else {
printRunLog(runLog)
// Go and fetch the run-level log text if available
if len(runLog.Stdout) > 0 {
logDataStream, err := o.workflowManager.GetRunLogData(o.runId, runLog.Stdout)
if err != nil {
log.Error().Msgf("Could not retrieve standard output from %s: %v", runLog.Stdout, err)
} else {
printLn("Run Standard Output:")
// We would like to copy from logDataStream to standard output,
// but we can't.
// We aren't actually allowed to use standard output here; we
// must do all our output through printLn, because otherwise
// the test harness cannot capture it and see that we have done
// it, and we fail the tests.
// So we need to go through each line in logDataStream, and printLn it.
scanner := bufio.NewScanner(*logDataStream)
for scanner.Scan() {
printLn(scanner.Text())
}
err = scanner.Err()
// TODO: bufio's Scanner can't handle arbitrarily long lines.
// If it finds a line longer than 64k, it will stop with an
// error.
// We can raise the limit, but we can't get rid of the limit.
// To fully support arbitrarily long lines in the log, we need
// to come up with a better way for the test harness to collect
// streaming output.
if err != nil {
return err
}
}
}
if len(runLog.Stderr) > 0 {
logDataStream, err := o.workflowManager.GetRunLogData(o.runId, runLog.Stderr)
if err != nil {
log.Error().Msgf("Could not retrieve standard error from %s: %v", runLog.Stderr, err)
} else {
printLn("Run Standard Error:")
scanner := bufio.NewScanner(*logDataStream)
for scanner.Scan() {
printLn(scanner.Text())
}
err = scanner.Err()
if err != nil {
return err
}
}
}
return nil
}

if len(jobIds) == 0 {
log.Info().Msgf("No logs available for run '%s'. Please try again later.", o.runId)
log.Info().Msgf("No job logs available for run '%s'. Please try again later.", o.runId)
return nil
}
notCachedJobIds := filterCachedJobIds(jobIds)
Expand Down
35 changes: 35 additions & 0 deletions packages/cli/internal/pkg/cli/logs_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cli
import (
"errors"
"fmt"
"io"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -147,6 +148,40 @@ func TestLogsWorkflowOpts_Execute(t *testing.T) {
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: \n\tName\t\tJobId\t\tStartTime\tStopTimeExitCode\n\tTest Task Name\tTest Job Id\t<nil>\t\t<nil>\t\n\t\n",
},
"runId stdout URL": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
opts.runId = testRunId
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLog(testRunId).Return(workflow.RunLog{
RunId: testRunId,
State: "COMPLETE",
Tasks: []workflow.Task(nil),
Stdout: "log/out",
}, nil)
stream := io.NopCloser(strings.NewReader("This is output"))
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLogData(testRunId, "log/out").Return(&stream, nil)
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: No task logs available\nRun Standard Output:\nThis is output\n",
},
"runId stderr URL": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
opts.runId = testRunId
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLog(testRunId).Return(workflow.RunLog{
RunId: testRunId,
State: "COMPLETE",
Tasks: []workflow.Task(nil),
Stderr: "log/err",
}, nil)
stream := io.NopCloser(strings.NewReader("This is error"))
opts.workflowManager.(*managermocks.MockWorkflowManager).EXPECT().
GetRunLogData(testRunId, "log/err").Return(&stream, nil)
},
expectedOutput: "RunId: Test Workflow Run Id\nState: COMPLETE\nTasks: No task logs available\nRun Standard Error:\nThis is error\n",
},
"runId no jobs": {
setupOps: func(opts *logsWorkflowOpts, cwlLopPaginator *awsmocks.MockCwlLogPaginator) {
opts.workflowName = testWorkflowName
Expand Down
5 changes: 5 additions & 0 deletions packages/cli/internal/pkg/cli/workflow/workflow_status.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package workflow

import (
"io"
)

type StatusManager interface {
StatusWorkflowAll(numInstances int) ([]InstanceSummary, error)
StatusWorkflowByInstanceId(instanceId string) ([]InstanceSummary, error)
Expand All @@ -9,6 +13,7 @@ type StatusManager interface {

type TasksManager interface {
GetRunLog(runId string) (RunLog, error)
GetRunLogData(runId string, dataUrl string) (*io.ReadCloser, error)
GetWorkflowTasks(runId string) ([]Task, error)
StatusWorkflowByName(workflowName string, numInstances int) ([]InstanceSummary, error)
}
Expand Down
30 changes: 24 additions & 6 deletions packages/cli/internal/pkg/cli/workflow/workflow_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package workflow

import (
"fmt"
"io"
"strings"
"time"

Expand All @@ -18,9 +19,11 @@ type Task struct {
}

type RunLog struct {
RunId string
State string
Tasks []Task
RunId string
State string
Stdout string
Stderr string
Tasks []Task
}

func (m *Manager) GetWorkflowTasks(runId string) ([]Task, error) {
Expand Down Expand Up @@ -49,12 +52,26 @@ func (m *Manager) GetRunLog(runId string) (RunLog, error) {
}

return RunLog{
RunId: m.taskProps.runLog.RunId,
State: string(m.taskProps.runLog.State),
Tasks: tasks,
RunId: m.taskProps.runLog.RunId,
State: string(m.taskProps.runLog.State),
Stdout: m.taskProps.runLog.RunLog.Stdout,
Stderr: m.taskProps.runLog.RunLog.Stderr,
Tasks: tasks,
}, nil
}

func (m *Manager) GetRunLogData(runId string, dataUrl string) (*io.ReadCloser, error) {
if m.err != nil {
return nil, m.err
}
var stream *io.ReadCloser
stream, m.err = m.wes.GetRunLogData(context.Background(), runId, dataUrl)
if m.err != nil {
return nil, m.err
}
return stream, nil
}

func (m *Manager) setContextForRun(runId string) {
if m.err != nil {
return
Expand All @@ -72,6 +89,7 @@ func (m *Manager) getRunLog(runId string) {
return
}
m.runLog, m.err = m.wes.GetRunLog(context.Background(), runId)
log.Debug().Msgf("Obtained log: %v", m.runLog)
}

func (m *Manager) getTasks() ([]Task, error) {
Expand Down
16 changes: 16 additions & 0 deletions packages/cli/internal/pkg/mocks/manager/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions packages/cli/internal/pkg/mocks/wes/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions packages/cli/internal/pkg/wes/Client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wes

import (
"context"
"io"

"github.com/aws/amazon-genomics-cli/internal/pkg/wes/option"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -51,3 +52,8 @@ func (c *Client) GetRunLog(ctx context.Context, runId string) (wes.RunLog, error
runLog, _, err := c.wes.WorkflowExecutionServiceApi.GetRunLog(ctx, runId)
return runLog, err
}

func (c *Client) GetRunLogData(ctx context.Context, runId string, dataUrl string) (*io.ReadCloser, error) {
runLogDataStream, _, err := c.wes.WorkflowExecutionServiceApi.GetRunLogData(ctx, runId, dataUrl)
return runLogDataStream, err
}
2 changes: 2 additions & 0 deletions packages/cli/internal/pkg/wes/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package wes

import (
"context"
"io"

"github.com/aws/amazon-genomics-cli/internal/pkg/wes/option"
wes "github.com/rsc/wes_client"
Expand All @@ -12,4 +13,5 @@ type Interface interface {
GetRunStatus(ctx context.Context, runId string) (string, error)
StopWorkflow(ctx context.Context, runId string) error
GetRunLog(ctx context.Context, runId string) (wes.RunLog, error)
GetRunLogData(ctx context.Context, runId string, dataUrl string) (*io.ReadCloser, error)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Additional methods for actually retrieving data pointed to by URLs in WES responses.
*/

package wes_client

import (
_context "context"
_ioutil "io/ioutil"
_nethttp "net/http"
_neturl "net/url"
"fmt"
"io"
"strings"
)

// Linger please
var (
_ _context.Context
)

/*
GetRunLogData Get data linked to by GetRunLog.
Returns a stream for the content of a URL referenced in a GetRunLog response.
* @param ctx _context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @param runId The run ID used for the GetRunLog call
* @param dataUrl The URL string from the GetRunLog call
@return *io.ReadCloser
*/
func (a *WorkflowExecutionServiceApiService) GetRunLogData(ctx _context.Context, runId string, dataUrl string) (*io.ReadCloser, *_nethttp.Response, error) {
var (
localVarHTTPMethod = _nethttp.MethodGet
localVarPostBody interface{}
localVarFormFileName string
localVarReturnValue *io.ReadCloser = nil
)

// create path and map variables
localVarPath := a.client.cfg.BasePath + "/runs/{run_id}"
localVarPath = strings.Replace(localVarPath, "{"+"run_id"+"}", _neturl.PathEscape(parameterToString(runId, "")), -1)

// Evaluate dataUrl relative to localVarPath and replace localVarPath
base, err := _neturl.Parse(localVarPath)
if err != nil {
return localVarReturnValue, nil, err
}
evaluated, err := base.Parse(dataUrl)
if err != nil {
return localVarReturnValue, nil, err
}
if (evaluated.Scheme != base.Scheme && !strings.HasPrefix(evaluated.Scheme, "http")) {
// This doesn't look like something we can fetch
return localVarReturnValue, nil, fmt.Errorf("WES cannot be used to retrieve %s", dataUrl)
}
localVarPath = evaluated.String()

// Request headers will go in here.
localVarHeaderParams := make(map[string]string)
// We don't use any of these, but we need them to invoke prepareRequest.
files := make(map[string][]byte)
localVarQueryParams := _neturl.Values{}
localVarFormParams := _neturl.Values{}

// We don't need any accept type choosing logic; we can only accept plain text.
localVarHeaderParams["Accept"] = "text/plain"

r, err := a.client.prepareRequest(ctx, localVarPath, localVarHTTPMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFormFileName, files)
if err != nil {
return localVarReturnValue, nil, err
}

// Make the request
localVarHTTPResponse, err := a.client.callAPI(r)
if err != nil || localVarHTTPResponse == nil {
return localVarReturnValue, localVarHTTPResponse, err
}
// Be ready to return the body stream
localVarReturnValue = &localVarHTTPResponse.Body

if localVarHTTPResponse.StatusCode >= 300 {
// Something has gone wrong sever-side (and this isn't a redirect)
// Fetch the entire body
localVarBody, err := _ioutil.ReadAll(localVarHTTPResponse.Body)
localVarHTTPResponse.Body.Close()
newErr := GenericOpenAPIError{
body: localVarBody,
error: localVarHTTPResponse.Status, // Despite the name, this must be a string
}
if err != nil {
// Something went wrong during error download.
// Add that error to our error as a string.
newErr.error = fmt.Sprintf("Failed to download body of HTTP error %d %s response: %v", localVarHTTPResponse.StatusCode, localVarHTTPResponse.Status, err)
return localVarReturnValue, localVarHTTPResponse, newErr
}
// Otherwise, we downloaded something. Maybe we can parse it as a WES-style JSON error.
var v ErrorResponse
err = a.client.decode(&v, localVarBody, localVarHTTPResponse.Header.Get("Content-Type"))
if err != nil {
// Nope, it's not a WES-style error.
// Don't explain why it's not parseable, just pass along what the server said.
newErr.error = fmt.Sprintf("Instead of log data, server sent a %d %s error with content: %s", localVarHTTPResponse.StatusCode, localVarHTTPResponse.Status, localVarBody)
return localVarReturnValue, localVarHTTPResponse, newErr
}
// Otherwise, it is a WES-style error we can understand (even if not a normally acceptable WES error code)
newErr.model = v
return localVarReturnValue, localVarHTTPResponse, newErr
}
// TODO: handle redirects?

return localVarReturnValue, localVarHTTPResponse, nil
}