X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2033d7f870dd0e4c65b563d689514fe21e478992..6b2f232c3d73a023d64112c609a28c8ff9cc27de:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 07436f5bc1..a1b246c841 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -15,6 +15,7 @@ import ( "os" "os/exec" "os/signal" + "path" "strings" "sync" "syscall" @@ -26,6 +27,7 @@ type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) + Call(method, resourceType, uuid, action string, parameters arvadosclient.Dict, output interface{}) (err error) } // ErrCancelled is the error returned when the container is cancelled. @@ -44,6 +46,7 @@ type Mount struct { PortableDataHash string `json:"portable_data_hash"` UUID string `json:"uuid"` DeviceType string `json:"device_type"` + Path string `json:"path"` } // Collection record returned by the API server. @@ -67,10 +70,16 @@ type ContainerRecord struct { Output string `json:"output"` } +// APIClientAuthorization is an arvados#api_client_authorization resource. +type APIClientAuthorization struct { + UUID string `json:"uuid"` + APIToken string `json:"api_token"` +} + // NewLogWriter is a factory function to create a new log writer. type NewLogWriter func(name string) io.WriteCloser -type RunArvMount func([]string) (*exec.Cmd, error) +type RunArvMount func(args []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -81,7 +90,7 @@ type ThinDockerClient interface { LoadImage(reader io.Reader) error CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error) StartContainer(id string, config *dockerclient.HostConfig) error - ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error) + AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error) Wait(id string) <-chan dockerclient.WaitResult RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error) } @@ -99,27 +108,28 @@ type ContainerRunner struct { NewLogWriter loggingDone chan bool CrunchLog *ThrottledLogger - Stdout *ThrottledLogger + Stdout io.WriteCloser Stderr *ThrottledLogger LogCollection *CollectionWriter LogsPDH *string RunArvMount MkTempDir - ArvMount *exec.Cmd - ArvMountPoint string - HostOutputDir string - Binds []string - OutputPDH *string - CancelLock sync.Mutex - Cancelled bool - SigChan chan os.Signal - ArvMountExit chan error - finalState string + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + CleanupTempDir []string + Binds []string + OutputPDH *string + CancelLock sync.Mutex + Cancelled bool + SigChan chan os.Signal + ArvMountExit chan error + finalState string } // SetupSignals sets up signal handling to gracefully terminate the underlying // Docker container and update state when receiving a TERM, INT or QUIT signal. -func (runner *ContainerRunner) SetupSignals() error { +func (runner *ContainerRunner) SetupSignals() { runner.SigChan = make(chan os.Signal, 1) signal.Notify(runner.SigChan, syscall.SIGTERM) signal.Notify(runner.SigChan, syscall.SIGINT) @@ -137,8 +147,6 @@ func (runner *ContainerRunner) SetupSignals() error { } } }(runner.SigChan) - - return nil } // LoadImage determines the docker image id from the container record and @@ -188,8 +196,19 @@ func (runner *ContainerRunner) LoadImage() (err error) { return nil } -func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) { +func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) { c = exec.Command("arv-mount", arvMountCmd...) + + // Copy our environment, but override ARVADOS_API_TOKEN with + // the container auth token. + c.Env = nil + for _, s := range os.Environ() { + if !strings.HasPrefix(s, "ARVADOS_API_TOKEN=") { + c.Env = append(c.Env, s) + } + } + c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token) + nt := NewThrottledLogger(runner.NewLogWriter("arv-mount")) c.Stdout = nt c.Stderr = nt @@ -238,13 +257,31 @@ func (runner *ContainerRunner) SetupMounts() (err error) { return fmt.Errorf("While creating keep mount temp dir: %v", err) } + runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint) + pdhOnly := true tmpcount := 0 - arvMountCmd := []string{"--foreground"} + arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"} collectionPaths := []string{} runner.Binds = nil for bind, mnt := range runner.ContainerRecord.Mounts { + if bind == "stdout" { + // Is it a "file" mount kind? + if mnt.Kind != "file" { + return fmt.Errorf("Unsupported mount kind '%s' for stdout. Only 'file' is supported.", mnt.Kind) + } + + // Does path start with OutputPath? + prefix := runner.ContainerRecord.OutputPath + if !strings.HasSuffix(prefix, "/") { + prefix += "/" + } + if !strings.HasPrefix(mnt.Path, prefix) { + return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix) + } + } + if mnt.Kind == "collection" { var src string if mnt.UUID != "" && mnt.PortableDataHash != "" { @@ -282,13 +319,19 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if err != nil { return fmt.Errorf("While creating mount temp dir: %v", err) } - + st, staterr := os.Stat(runner.HostOutputDir) + if staterr != nil { + return fmt.Errorf("While Stat on temp dir: %v", staterr) + } + err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777) + if staterr != nil { + return fmt.Errorf("While Chmod temp dir: %v", err) + } + runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir) runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind)) } else { runner.Binds = append(runner.Binds, bind) } - } else { - return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind) } } @@ -303,7 +346,12 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) - runner.ArvMount, err = runner.RunArvMount(arvMountCmd) + token, err := runner.ContainerToken() + if err != nil { + return fmt.Errorf("could not get container token: %s", err) + } + + runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { return fmt.Errorf("While trying to start arv-mount: %v", err) } @@ -318,6 +366,94 @@ func (runner *ContainerRunner) SetupMounts() (err error) { return nil } +func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) { + // Handle docker log protocol + // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container + + header := make([]byte, 8) + for { + _, readerr := io.ReadAtLeast(containerReader, header, 8) + + if readerr == nil { + readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24) + if header[0] == 1 { + // stdout + _, readerr = io.CopyN(runner.Stdout, containerReader, readsize) + } else { + // stderr + _, readerr = io.CopyN(runner.Stderr, containerReader, readsize) + } + } + + if readerr != nil { + if readerr != io.EOF { + runner.CrunchLog.Printf("While reading docker logs: %v", readerr) + } + + closeerr := runner.Stdout.Close() + if closeerr != nil { + runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr) + } + + closeerr = runner.Stderr.Close() + if closeerr != nil { + runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr) + } + + runner.loggingDone <- true + close(runner.loggingDone) + return + } + } +} + +// AttachLogs connects the docker container stdout and stderr logs to the +// Arvados logger which logs to Keep and the API server logs table. +func (runner *ContainerRunner) AttachStreams() (err error) { + + runner.CrunchLog.Print("Attaching container streams") + + var containerReader io.Reader + containerReader, err = runner.Docker.AttachContainer(runner.ContainerID, + &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true}) + if err != nil { + return fmt.Errorf("While attaching container stdout/stderr streams: %v", err) + } + + runner.loggingDone = make(chan bool) + + if stdoutMnt, ok := runner.ContainerRecord.Mounts["stdout"]; ok { + stdoutPath := stdoutMnt.Path[len(runner.ContainerRecord.OutputPath):] + index := strings.LastIndex(stdoutPath, "/") + if index > 0 { + subdirs := stdoutPath[:index] + if subdirs != "" { + st, err := os.Stat(runner.HostOutputDir) + if err != nil { + return fmt.Errorf("While Stat on temp dir: %v", err) + } + stdoutPath := path.Join(runner.HostOutputDir, subdirs) + err = os.MkdirAll(stdoutPath, st.Mode()|os.ModeSetgid|0777) + if err != nil { + return fmt.Errorf("While MkdirAll %q: %v", stdoutPath, err) + } + } + } + stdoutFile, err := os.Create(path.Join(runner.HostOutputDir, stdoutPath)) + if err != nil { + return fmt.Errorf("While creating stdout file: %v", err) + } + runner.Stdout = stdoutFile + } else { + runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout")) + } + runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) + + go runner.ProcessDockerAttach(containerReader) + + return nil +} + // StartContainer creates the container and runs it. func (runner *ContainerRunner) StartContainer() (err error) { runner.CrunchLog.Print("Creating Docker container") @@ -341,46 +477,28 @@ func (runner *ContainerRunner) StartContainer() (err error) { if err != nil { return fmt.Errorf("While creating container: %v", err) } - hostConfig := &dockerclient.HostConfig{Binds: runner.Binds} + hostConfig := &dockerclient.HostConfig{Binds: runner.Binds, + LogConfig: dockerclient.LogConfig{Type: "none"}} - runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) - err = runner.Docker.StartContainer(runner.ContainerID, hostConfig) + err = runner.AttachStreams() if err != nil { - return fmt.Errorf("While starting container: %v", err) + return err } - return nil -} - -// AttachLogs connects the docker container stdout and stderr logs to the -// Arvados logger which logs to Keep and the API server logs table. -func (runner *ContainerRunner) AttachLogs() (err error) { - - runner.CrunchLog.Print("Attaching container logs") - - var stderrReader, stdoutReader io.Reader - stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true}) - if err != nil { - return fmt.Errorf("While getting container standard error: %v", err) - } - stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true}) + runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID) + err = runner.Docker.StartContainer(runner.ContainerID, hostConfig) if err != nil { - return fmt.Errorf("While getting container standard output: %v", err) + return fmt.Errorf("While starting container: %v", err) } - runner.loggingDone = make(chan bool) - - runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout")) - runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr")) - go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone) - go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone) - return nil } // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. func (runner *ContainerRunner) WaitFinish() error { + runner.CrunchLog.Print("Waiting for container to finish") + result := runner.Docker.Wait(runner.ContainerID) wr := <-result if wr.Error != nil { @@ -388,33 +506,14 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.ExitCode = &wr.ExitCode - // drain stdout/stderr - <-runner.loggingDone + // wait for stdout/stderr to complete <-runner.loggingDone - runner.Stdout.Close() - runner.Stderr.Close() - return nil } -// HandleOutput sets the output and unmounts the FUSE mount. +// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories func (runner *ContainerRunner) CaptureOutput() error { - if runner.ArvMount != nil { - defer func() { - umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint) - umnterr := umount.Run() - if umnterr != nil { - runner.CrunchLog.Print("While running fusermount: %v", umnterr) - } - - mnterr := <-runner.ArvMountExit - if mnterr != nil { - runner.CrunchLog.Print("Arv-mount exit error: %v", mnterr) - } - }() - } - if runner.finalState != "Complete" { return nil } @@ -471,6 +570,28 @@ func (runner *ContainerRunner) CaptureOutput() error { return nil } +func (runner *ContainerRunner) CleanupDirs() { + if runner.ArvMount != nil { + umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint) + umnterr := umount.Run() + if umnterr != nil { + runner.CrunchLog.Printf("While running fusermount: %v", umnterr) + } + + mnterr := <-runner.ArvMountExit + if mnterr != nil { + runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr) + } + } + + for _, tmpdir := range runner.CleanupTempDir { + rmerr := os.RemoveAll(tmpdir) + if rmerr != nil { + runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr) + } + } +} + // CommitLogs posts the collection containing the final container logs. func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Print(runner.finalState) @@ -511,6 +632,14 @@ func (runner *ContainerRunner) UpdateContainerRecordRunning() error { arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil) } +// ContainerToken returns the api_token the container (and any +// arv-mount processes) are allowed to use. +func (runner *ContainerRunner) ContainerToken() (string, error) { + var auth APIClientAuthorization + err := runner.ArvClient.Call("GET", "containers", runner.ContainerRecord.UUID, "auth", nil, &auth) + return auth.APIToken, err +} + // UpdateContainerRecordComplete updates the container record state on API // server to "Complete" or "Cancelled" func (runner *ContainerRunner) UpdateContainerRecordComplete() error { @@ -539,6 +668,13 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID) + hostname, hosterr := os.Hostname() + if hosterr != nil { + runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr) + } else { + runner.CrunchLog.Printf("Executing on host '%s'", hostname) + } + var runerr, waiterr error defer func() { @@ -552,12 +688,15 @@ func (runner *ContainerRunner) Run() (err error) { runner.finalState = "Complete" } - // (7) capture output + // (6) capture output outputerr := runner.CaptureOutput() if outputerr != nil { runner.CrunchLog.Print(outputerr) } + // (7) clean up temporary directories + runner.CleanupDirs() + // (8) write logs logerr := runner.CommitLogs() if logerr != nil { @@ -576,7 +715,7 @@ func (runner *ContainerRunner) Run() (err error) { if runerr != nil { err = runerr } else if waiterr != nil { - err = runerr + err = waiterr } else if logerr != nil { err = logerr } else if updateerr != nil { @@ -591,10 +730,7 @@ func (runner *ContainerRunner) Run() (err error) { } // (1) setup signal handling - err = runner.SetupSignals() - if err != nil { - return fmt.Errorf("While setting up signal handling: %v", err) - } + runner.SetupSignals() // (2) check for and/or load image err = runner.LoadImage() @@ -623,13 +759,7 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Print(err) } - // (5) attach container logs - runerr = runner.AttachLogs() - if runerr != nil { - runner.CrunchLog.Print(runerr) - } - - // (6) wait for container to finish + // (5) wait for container to finish waiterr = runner.WaitFinish() return @@ -648,36 +778,39 @@ func NewContainerRunner(api IArvadosClient, cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}} cr.ContainerRecord.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) + cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) return cr } func main() { flag.Parse() + containerId := flag.Arg(0) + api, err := arvadosclient.MakeArvadosClient() if err != nil { - log.Fatalf("%s: %v", flag.Arg(0), err) + log.Fatalf("%s: %v", containerId, err) } api.Retries = 8 var kc *keepclient.KeepClient kc, err = keepclient.MakeKeepClient(&api) if err != nil { - log.Fatalf("%s: %v", flag.Arg(0), err) + log.Fatalf("%s: %v", containerId, err) } kc.Retries = 4 var docker *dockerclient.DockerClient docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil) if err != nil { - log.Fatalf("%s: %v", flag.Arg(0), err) + log.Fatalf("%s: %v", containerId, err) } - cr := NewContainerRunner(api, kc, docker, flag.Arg(0)) + cr := NewContainerRunner(api, kc, docker, containerId) err = cr.Run() if err != nil { - log.Fatalf("%s: %v", flag.Arg(0), err) + log.Fatalf("%s: %v", containerId, err) } }