X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cac5db66cba0d5dd97c8434853bcbf2ab19fbda5..6b2f232c3d73a023d64112c609a28c8ff9cc27de:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 01edb0a516..a1b246c841 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -15,6 +15,7 @@ import ( "os" "os/exec" "os/signal" + "path" "strings" "sync" "syscall" @@ -26,6 +27,7 @@ type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) + Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error) } // ErrCancelled is the error returned when the container is cancelled. @@ -44,6 +46,7 @@ type Mount struct { PortableDataHash string `json:"portable_data_hash"` UUID string `json:"uuid"` DeviceType string `json:"device_type"` + Path string `json:"path"` } // Collection record returned by the API server. @@ -67,10 +70,16 @@ type ContainerRecord struct { Output string `json:"output"` } +// APIClientAuthorization is an arvados#api_client_authorization resource. +type APIClientAuthorization struct { + UUID string `json:"uuid"` + APIToken string `json:"api_token"` +} + // NewLogWriter is a factory function to create a new log writer. type NewLogWriter func(name string) io.WriteCloser -type RunArvMount func([]string) (*exec.Cmd, error) +type RunArvMount func(args []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -99,7 +108,7 @@ type ContainerRunner struct { NewLogWriter loggingDone chan bool CrunchLog *ThrottledLogger - Stdout *ThrottledLogger + Stdout io.WriteCloser Stderr *ThrottledLogger LogCollection *CollectionWriter LogsPDH *string @@ -187,8 +196,19 @@ func (runner *ContainerRunner) LoadImage() (err error) { return nil } -func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) { +func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) { c = exec.Command("arv-mount", arvMountCmd...) + + // Copy our environment, but override ARVADOS_API_TOKEN with + // the container auth token. + c.Env = nil + for _, s := range os.Environ() { + if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") { + c.Env = append(c.Env, s) + } + } + c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) + nt := NewThrottledLogger(runner.NewLogWriter("arv-mount")) c.Stdout = nt c.Stderr = nt @@ -246,6 +266,22 @@ func (runner *ContainerRunner) SetupMounts() (err error) { runner.Binds = nil for bind, mnt := range runner.ContainerRecord.Mounts { + if bind == "stdout" { + // Is it a "file" mount kind? + if mnt.Kind != "file" { + return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind) + } + + // Does path start with OutputPath? + prefix := runner.ContainerRecord.OutputPath + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + if !strings.HasPrefix(mnt.Path, prefix) { + return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix) + } + } + if mnt.Kind == "collection" { var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { @@ -296,8 +332,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } else { runner.Binds = append(runner.Binds, bind) } - } else { - return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind) } } @@ -312,7 +346,12 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) - runner.ArvMount, err = runner.RunArvMount(arvMountCmd) + token, err := runner.ContainerToken() + if err != nil { + return fmt.Errorf("could not get container token: %s", err) + } + + runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { return fmt.Errorf("While trying to start arv-mount: %v", err) } @@ -383,7 +422,31 @@ func (runner *ContainerRunner) AttachStreams() (err error) { runner.loggingDone = make(chan bool) - runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout")) + if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok { + stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):] + index := strings.LastIndex(stdoutPath, "/") + if index > 0 { + subdirs := stdoutPath[:index] + if subdirs != "" { + st, err := os.Stat(runner.HostOutputDir) + if err != nil { + return fmt.Errorf("While Stat on temp dir: %v", err) + } + stdoutPath := path.Join(runner.HostOutputDir, subdirs) + err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777) + if err != nil { + return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err) + } + } + } + stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath)) + if err != nil { + return fmt.Errorf("While creating stdout file: %v", err) + } + runner.Stdout = stdoutFile + } else { + runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout")) + } runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) go runner.ProcessDockerAttach(containerReader) @@ -569,6 +632,14 @@ func (runner *ContainerRunner) UpdateContainerRecordRunning() error { arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) } +// ContainerToken returns the api_token the container (and any +// arv-mount processes) are allowed to use. +func (runner *ContainerRunner) ContainerToken() (string, error) { + var auth APIClientAuthorization + err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth) + return auth.APIToken, err +} + // UpdateContainerRecordComplete updates the container record state on API // server to "Complete" or "Cancelled" func (runner *ContainerRunner) UpdateContainerRecordComplete() error {