Merge branch '8015-crunch2-mount' into 6518-crunch2-dispatch-slurm
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 19 Feb 2016 02:42:31 +0000 (21:42 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 19 Feb 2016 02:42:31 +0000 (21:42 -0500)
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunch-run/logging.go
services/crunch-run/logging_test.go
services/crunch-run/upload.go
services/crunch-run/upload_test.go [new file with mode: 0644]

index be1fef86e1928165e161b57c0788e95c922ba541..e05c0c5da4439e44931837ea5a259885624b80d8 100644 (file)
@@ -215,4 +215,6 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
                        log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
                }
        }
+
+       log.Printf("Finished container run for %v", uuid)
 }
index 640ac88ca9a0aa70c1cc305012ab3bb18962ae6e..64f0d77b7eb948d64f7ba8344153827408307a55 100644 (file)
@@ -1,19 +1,24 @@
 package main
 
 import (
+       "encoding/json"
        "errors"
        "flag"
+       "fmt"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "github.com/curoverse/dockerclient"
        "io"
+       "io/ioutil"
        "log"
        "os"
+       "os/exec"
        "os/signal"
        "strings"
        "sync"
        "syscall"
+       "time"
 )
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -33,11 +38,18 @@ type IKeepClient interface {
 }
 
 // Mount describes the mount points to create inside the container.
-type Mount struct{}
+type Mount struct {
+       Kind             string `json:"kind"`
+       Writable         bool   `json:"writable"`
+       PortableDataHash string `json:"portable_data_hash"`
+       UUID             string `json:"uuid"`
+       DeviceType       string `json:"device_type"`
+}
 
 // Collection record returned by the API server.
-type Collection struct {
-       ManifestText string `json:"manifest_text"`
+type CollectionRecord struct {
+       ManifestText     string `json:"manifest_text"`
+       PortableDataHash string `json:"portable_data_hash"`
 }
 
 // ContainerRecord is the container record returned by the API server.
@@ -52,11 +64,16 @@ type ContainerRecord struct {
        Priority           int                    `json:"priority"`
        RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
        State              string                 `json:"state"`
+       Output             string                 `json:"output"`
 }
 
 // NewLogWriter is a factory function to create a new log writer.
 type NewLogWriter func(name string) io.WriteCloser
 
+type RunArvMount func([]string) (*exec.Cmd, error)
+
+type MkTempDir func(string, string) (string, error)
+
 // ThinDockerClient is the minimal Docker client interface used by crunch-run.
 type ThinDockerClient interface {
        StopContainer(id string, timeout int) error
@@ -64,7 +81,7 @@ type ThinDockerClient interface {
        LoadImage(reader io.Reader) error
        CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
        StartContainer(id string, config *dockerclient.HostConfig) error
-       ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
+       AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
        Wait(id string) <-chan dockerclient.WaitResult
        RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
 }
@@ -86,10 +103,19 @@ type ContainerRunner struct {
        Stderr        *ThrottledLogger
        LogCollection *CollectionWriter
        LogsPDH       *string
-       CancelLock    sync.Mutex
-       Cancelled     bool
-       SigChan       chan os.Signal
-       finalState    string
+       RunArvMount
+       MkTempDir
+       ArvMount       *exec.Cmd
+       ArvMountPoint  string
+       HostOutputDir  string
+       CleanupTempDir []string
+       Binds          []string
+       OutputPDH      *string
+       CancelLock     sync.Mutex
+       Cancelled      bool
+       SigChan        chan os.Signal
+       ArvMountExit   chan error
+       finalState     string
 }
 
 // SetupSignals sets up signal handling to gracefully terminate the underlying
@@ -123,17 +149,17 @@ func (runner *ContainerRunner) LoadImage() (err error) {
 
        runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
 
-       var collection Collection
+       var collection CollectionRecord
        err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
        if err != nil {
-               return err
+               return fmt.Errorf("While getting container image collection: %v", err)
        }
        manifest := manifest.Manifest{Text: collection.ManifestText}
        var img, imageID string
        for ms := range manifest.StreamIter() {
                img = ms.FileStreamSegments[0].Name
                if !strings.HasSuffix(img, ".tar") {
-                       return errors.New("First file in the collection does not end in .tar")
+                       return fmt.Errorf("First file in the container image collection does not end in .tar")
                }
                imageID = img[:len(img)-4]
        }
@@ -147,12 +173,12 @@ func (runner *ContainerRunner) LoadImage() (err error) {
                var readCloser io.ReadCloser
                readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
                if err != nil {
-                       return err
+                       return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
                }
 
                err = runner.Docker.LoadImage(readCloser)
                if err != nil {
-                       return err
+                       return fmt.Errorf("While loading container image into Docker: %v", err)
                }
        } else {
                runner.CrunchLog.Print("Docker image is available")
@@ -163,6 +189,210 @@ func (runner *ContainerRunner) LoadImage() (err error) {
        return nil
 }
 
+func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
+       c = exec.Command("arv-mount", arvMountCmd...)
+       nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+       c.Stdout = nt
+       c.Stderr = nt
+
+       err = c.Start()
+       if err != nil {
+               return nil, err
+       }
+
+       statReadme := make(chan bool)
+       runner.ArvMountExit = make(chan error)
+
+       keepStatting := true
+       go func() {
+               for keepStatting {
+                       time.Sleep(100 * time.Millisecond)
+                       _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
+                       if err == nil {
+                               keepStatting = false
+                               statReadme <- true
+                       }
+               }
+               close(statReadme)
+       }()
+
+       go func() {
+               runner.ArvMountExit <- c.Wait()
+               close(runner.ArvMountExit)
+       }()
+
+       select {
+       case <-statReadme:
+               break
+       case err := <-runner.ArvMountExit:
+               runner.ArvMount = nil
+               keepStatting = false
+               return nil, err
+       }
+
+       return c, nil
+}
+
+func (runner *ContainerRunner) SetupMounts() (err error) {
+       runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+       if err != nil {
+               return fmt.Errorf("While creating keep mount temp dir: %v", err)
+       }
+
+       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
+
+       pdhOnly := true
+       tmpcount := 0
+       arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+       collectionPaths := []string{}
+       runner.Binds = nil
+
+       for bind, mnt := range runner.ContainerRecord.Mounts {
+               if mnt.Kind == "collection" {
+                       var src string
+                       if mnt.UUID != "" && mnt.PortableDataHash != "" {
+                               return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
+                       }
+                       if mnt.UUID != "" {
+                               if mnt.Writable {
+                                       return fmt.Errorf("Writing to existing collections currently not permitted.")
+                               }
+                               pdhOnly = false
+                               src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
+                       } else if mnt.PortableDataHash != "" {
+                               if mnt.Writable {
+                                       return fmt.Errorf("Can never write to a collection specified by portable data hash")
+                               }
+                               src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
+                       } else {
+                               src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
+                               arvMountCmd = append(arvMountCmd, "--mount-tmp")
+                               arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+                               tmpcount += 1
+                       }
+                       if mnt.Writable {
+                               if bind == runner.ContainerRecord.OutputPath {
+                                       runner.HostOutputDir = src
+                               }
+                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+                       } else {
+                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+                       }
+                       collectionPaths = append(collectionPaths, src)
+               } else if mnt.Kind == "tmp" {
+                       if bind == runner.ContainerRecord.OutputPath {
+                               runner.HostOutputDir, err = runner.MkTempDir("", "")
+                               if err != nil {
+                                       return fmt.Errorf("While creating mount temp dir: %v", err)
+                               }
+                               st, staterr := os.Stat(runner.HostOutputDir)
+                               if staterr != nil {
+                                       return fmt.Errorf("While Stat on temp dir: %v", staterr)
+                               }
+                               err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                               if staterr != nil {
+                                       return fmt.Errorf("While Chmod temp dir: %v", err)
+                               }
+                               runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
+                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
+                       } else {
+                               runner.Binds = append(runner.Binds, bind)
+                       }
+               } else {
+                       return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
+               }
+       }
+
+       if runner.HostOutputDir == "" {
+               return fmt.Errorf("Output path does not correspond to a writable mount point")
+       }
+
+       if pdhOnly {
+               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+       } else {
+               arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
+       }
+       arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
+
+       runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
+       if err != nil {
+               return fmt.Errorf("While trying to start arv-mount: %v", err)
+       }
+
+       for _, p := range collectionPaths {
+               _, err = os.Stat(p)
+               if err != nil {
+                       return fmt.Errorf("While checking that input files exist: %v", err)
+               }
+       }
+
+       return nil
+}
+
+func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
+       // Handle docker log protocol
+       // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+
+       header := make([]byte, 8)
+       for {
+               _, readerr := io.ReadAtLeast(containerReader, header, 8)
+
+               if readerr == nil {
+                       readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+                       if header[0] == 1 {
+                               // stdout
+                               _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
+                       } else {
+                               // stderr
+                               _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+                       }
+               }
+
+               if readerr != nil {
+                       if readerr != io.EOF {
+                               runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
+                       }
+
+                       closeerr := runner.Stdout.Close()
+                       if closeerr != nil {
+                               runner.CrunchLog.Printf("While closing stdout logs: %v", readerr)
+                       }
+
+                       closeerr = runner.Stderr.Close()
+                       if closeerr != nil {
+                               runner.CrunchLog.Printf("While closing stderr logs: %v", readerr)
+                       }
+
+                       runner.loggingDone <- true
+                       close(runner.loggingDone)
+                       return
+               }
+       }
+}
+
+// AttachLogs connects the docker container stdout and stderr logs to the
+// Arvados logger which logs to Keep and the API server logs table.
+func (runner *ContainerRunner) AttachStreams() (err error) {
+
+       runner.CrunchLog.Print("Attaching container streams")
+
+       var containerReader io.Reader
+       containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
+               &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+       if err != nil {
+               return fmt.Errorf("While attaching container logs: %v", err)
+       }
+
+       runner.loggingDone = make(chan bool)
+
+       runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+       go runner.ProcessDockerAttach(containerReader)
+
+       return nil
+}
+
 // StartContainer creates the container and runs it.
 func (runner *ContainerRunner) StartContainer() (err error) {
        runner.CrunchLog.Print("Creating Docker container")
@@ -181,67 +411,127 @@ func (runner *ContainerRunner) StartContainer() (err error) {
        for k, v := range runner.ContainerRecord.Environment {
                runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
        }
+       runner.ContainerConfig.NetworkDisabled = true
        runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
        if err != nil {
-               return
+               return fmt.Errorf("While creating container: %v", err)
        }
-       hostConfig := &dockerclient.HostConfig{}
+       hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
+               LogConfig: dockerclient.LogConfig{Type: "none"}}
 
-       runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
-       err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
+       runner.AttachStreams()
        if err != nil {
-               return
+               return fmt.Errorf("While attaching streams: %v", err)
+               return err
        }
 
-       return nil
-}
-
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
-func (runner *ContainerRunner) AttachLogs() (err error) {
-
-       runner.CrunchLog.Print("Attaching container logs")
-
-       var stderrReader, stdoutReader io.Reader
-       stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
-       if err != nil {
-               return
-       }
-       stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
+       runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+       err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
        if err != nil {
-               return
+               return fmt.Errorf("While starting container: %v", err)
        }
 
-       runner.loggingDone = make(chan bool)
-
-       runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
-       runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
-       go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
-       go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
-
        return nil
 }
 
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
 func (runner *ContainerRunner) WaitFinish() error {
+       runner.CrunchLog.Print("Waiting for container to finish")
+
        result := runner.Docker.Wait(runner.ContainerID)
        wr := <-result
        if wr.Error != nil {
-               return wr.Error
+               return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
        }
        runner.ExitCode = &wr.ExitCode
 
-       // drain stdout/stderr
-       <-runner.loggingDone
+       // wait for stdout/stderr to complete
        <-runner.loggingDone
 
-       runner.Stdout.Close()
-       runner.Stderr.Close()
+       return nil
+}
+
+// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
+func (runner *ContainerRunner) CaptureOutput() error {
+       if runner.finalState != "Complete" {
+               return nil
+       }
+
+       if runner.HostOutputDir == "" {
+               return nil
+       }
+
+       _, err := os.Stat(runner.HostOutputDir)
+       if err != nil {
+               return fmt.Errorf("While checking host output path: %v", err)
+       }
+
+       var manifestText string
+
+       collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
+       _, err = os.Stat(collectionMetafile)
+       if err != nil {
+               // Regular directory
+               cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
+               manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+               if err != nil {
+                       return fmt.Errorf("While uploading output files: %v", err)
+               }
+       } else {
+               // FUSE mount directory
+               file, openerr := os.Open(collectionMetafile)
+               if openerr != nil {
+                       return fmt.Errorf("While opening FUSE metafile: %v", err)
+               }
+               defer file.Close()
+
+               rec := CollectionRecord{}
+               err = json.NewDecoder(file).Decode(&rec)
+               if err != nil {
+                       return fmt.Errorf("While reading FUSE metafile: %v", err)
+               }
+               manifestText = rec.ManifestText
+       }
+
+       var response CollectionRecord
+       err = runner.ArvClient.Create("collections",
+               arvadosclient.Dict{
+                       "collection": arvadosclient.Dict{
+                               "manifest_text": manifestText}},
+               &response)
+       if err != nil {
+               return fmt.Errorf("While creating output collection: %v", err)
+       }
+
+       runner.OutputPDH = new(string)
+       *runner.OutputPDH = response.PortableDataHash
 
        return nil
 }
 
+func (runner *ContainerRunner) CleanupDirs() {
+       if runner.ArvMount != nil {
+               umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+               umnterr := umount.Run()
+               if umnterr != nil {
+                       runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+               }
+
+               mnterr := <-runner.ArvMountExit
+               if mnterr != nil {
+                       runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+               }
+       }
+
+       for _, tmpdir := range runner.CleanupTempDir {
+               rmerr := os.RemoveAll(tmpdir)
+               if rmerr != nil {
+                       runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
+               }
+       }
+}
+
 // CommitLogs posts the collection containing the final container logs.
 func (runner *ContainerRunner) CommitLogs() error {
        runner.CrunchLog.Print(runner.finalState)
@@ -256,28 +546,30 @@ func (runner *ContainerRunner) CommitLogs() error {
 
        mt, err := runner.LogCollection.ManifestText()
        if err != nil {
-               return err
+               return fmt.Errorf("While creating log manifest: %v", err)
        }
 
-       response := make(map[string]string)
+       var response CollectionRecord
        err = runner.ArvClient.Create("collections",
-               arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID,
-                       "manifest_text": mt},
-               response)
+               arvadosclient.Dict{
+                       "collection": arvadosclient.Dict{
+                               "name":          "logs for " + runner.ContainerRecord.UUID,
+                               "manifest_text": mt}},
+               &response)
        if err != nil {
-               return err
+               return fmt.Errorf("While creating log collection: %v", err)
        }
 
        runner.LogsPDH = new(string)
-       *runner.LogsPDH = response["portable_data_hash"]
+       *runner.LogsPDH = response.PortableDataHash
 
        return nil
 }
 
 // UpdateContainerRecordRunning updates the container state to "Running"
 func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
-       update := arvadosclient.Dict{"state": "Running"}
-       return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
+       return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
+               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
 }
 
 // UpdateContainerRecordComplete updates the container record state on API
@@ -290,10 +582,13 @@ func (runner *ContainerRunner) UpdateContainerRecordComplete() error {
        if runner.ExitCode != nil {
                update["exit_code"] = *runner.ExitCode
        }
+       if runner.OutputPDH != nil {
+               update["output"] = runner.OutputPDH
+       }
 
        update["state"] = runner.finalState
 
-       return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
+       return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
 }
 
 // NewArvLogWriter creates an ArvLogWriter
@@ -302,8 +597,15 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
 }
 
 // Run the full container lifecycle.
-func (runner *ContainerRunner) Run(containerUUID string) (err error) {
-       runner.CrunchLog.Printf("Executing container '%s'", containerUUID)
+func (runner *ContainerRunner) Run() (err error) {
+       runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
+
+       hostname, hosterr := os.Hostname()
+       if hosterr != nil {
+               runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
+       } else {
+               runner.CrunchLog.Printf("Executing on host '%s'", runner.ContainerRecord.UUID, hostname)
+       }
 
        var runerr, waiterr error
 
@@ -318,13 +620,22 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) {
                        runner.finalState = "Complete"
                }
 
-               // (6) write logs
+               // (6) capture output
+               outputerr := runner.CaptureOutput()
+               if outputerr != nil {
+                       runner.CrunchLog.Print(outputerr)
+               }
+
+               // (7) clean up temporary directories
+               runner.CleanupDirs()
+
+               // (8) write logs
                logerr := runner.CommitLogs()
                if logerr != nil {
                        runner.CrunchLog.Print(logerr)
                }
 
-               // (7) update container record with results
+               // (9) update container record with results
                updateerr := runner.UpdateContainerRecordComplete()
                if updateerr != nil {
                        runner.CrunchLog.Print(updateerr)
@@ -345,24 +656,30 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) {
                }
        }()
 
-       err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord)
+       err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
        if err != nil {
-               return
+               return fmt.Errorf("While getting container record: %v", err)
        }
 
-       // (0) setup signal handling
+       // (1) setup signal handling
        err = runner.SetupSignals()
        if err != nil {
-               return
+               return fmt.Errorf("While setting up signal handling: %v", err)
        }
 
-       // (1) check for and/or load image
+       // (2) check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               return
+               return fmt.Errorf("While loading container image: %v", err)
        }
 
-       // (2) start container
+       // (3) set up FUSE mount and binds
+       err = runner.SetupMounts()
+       if err != nil {
+               return fmt.Errorf("While setting up mounts: %v", err)
+       }
+
+       // (3) create and start container
        err = runner.StartContainer()
        if err != nil {
                if err == ErrCancelled {
@@ -371,18 +688,12 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) {
                return
        }
 
-       // (3) update container record state
+       // (4) update container record state
        err = runner.UpdateContainerRecordRunning()
        if err != nil {
                runner.CrunchLog.Print(err)
        }
 
-       // (4) attach container logs
-       runerr = runner.AttachLogs()
-       if runerr != nil {
-               runner.CrunchLog.Print(runerr)
-       }
-
        // (5) wait for container to finish
        waiterr = runner.WaitFinish()
 
@@ -392,42 +703,49 @@ func (runner *ContainerRunner) Run(containerUUID string) (err error) {
 // NewContainerRunner creates a new container runner.
 func NewContainerRunner(api IArvadosClient,
        kc IKeepClient,
-       docker ThinDockerClient) *ContainerRunner {
+       docker ThinDockerClient,
+       containerUUID string) *ContainerRunner {
 
        cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
        cr.NewLogWriter = cr.NewArvLogWriter
-       cr.LogCollection = &CollectionWriter{kc, nil}
+       cr.RunArvMount = cr.ArvMountCmd
+       cr.MkTempDir = ioutil.TempDir
+       cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+       cr.ContainerRecord.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
+       cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
        return cr
 }
 
 func main() {
        flag.Parse()
 
+       containerId := flag.Arg(0)
+
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Fatal(err)
+               log.Fatalf("%s: %v", containerId, err)
        }
        api.Retries = 8
 
        var kc *keepclient.KeepClient
        kc, err = keepclient.MakeKeepClient(&api)
        if err != nil {
-               log.Fatal(err)
+               log.Fatalf("%s: %v", containerId, err)
        }
        kc.Retries = 4
 
        var docker *dockerclient.DockerClient
        docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
        if err != nil {
-               log.Fatal(err)
+               log.Fatalf("%s: %v", containerId, err)
        }
 
-       cr := NewContainerRunner(api, kc, docker)
+       cr := NewContainerRunner(api, kc, docker, containerId)
 
-       err = cr.Run(flag.Arg(0))
+       err = cr.Run()
        if err != nil {
-               log.Fatal(err)
+               log.Fatalf("%s: %v", containerId, err)
        }
 
 }
index 1946e5c403e02756f18589bc9a37a8333e2ca7a4..bf75e7f0eab2e539f97744f2315b9130ff4fa48f 100644 (file)
@@ -13,6 +13,10 @@ import (
        . "gopkg.in/check.v1"
        "io"
        "io/ioutil"
+       "log"
+       "os"
+       "os/exec"
+       "sort"
        "strings"
        "syscall"
        "testing"
@@ -51,22 +55,19 @@ var otherManifest = ". 68a84f561b1d1708c6baff5e019a9ab3+46+Ae5d0af96944a3690becb
 var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
 
 type TestDockerClient struct {
-       imageLoaded  string
-       stdoutReader io.ReadCloser
-       stderrReader io.ReadCloser
-       stdoutWriter io.WriteCloser
-       stderrWriter io.WriteCloser
-       fn           func(t *TestDockerClient)
-       finish       chan dockerclient.WaitResult
-       stop         chan bool
-       cwd          string
-       env          []string
+       imageLoaded string
+       logReader   io.ReadCloser
+       logWriter   io.WriteCloser
+       fn          func(t *TestDockerClient)
+       finish      chan dockerclient.WaitResult
+       stop        chan bool
+       cwd         string
+       env         []string
 }
 
 func NewTestDockerClient() *TestDockerClient {
        t := &TestDockerClient{}
-       t.stdoutReader, t.stdoutWriter = io.Pipe()
-       t.stderrReader, t.stderrWriter = io.Pipe()
+       t.logReader, t.logWriter = io.Pipe()
        t.finish = make(chan dockerclient.WaitResult)
        t.stop = make(chan bool)
        t.cwd = "/"
@@ -113,14 +114,8 @@ func (t *TestDockerClient) StartContainer(id string, config *dockerclient.HostCo
        }
 }
 
-func (t *TestDockerClient) ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error) {
-       if options.Stdout {
-               return t.stdoutReader, nil
-       }
-       if options.Stderr {
-               return t.stderrReader, nil
-       }
-       return nil, nil
+func (t *TestDockerClient) AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error) {
+       return t.logReader, nil
 }
 
 func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
@@ -139,20 +134,20 @@ func (this *ArvTestClient) Create(resourceType string,
        this.Content = parameters
 
        if resourceType == "logs" {
-               et := parameters["event_type"].(string)
+               et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
                if this.Logs == nil {
                        this.Logs = make(map[string]*bytes.Buffer)
                }
                if this.Logs[et] == nil {
                        this.Logs[et] = &bytes.Buffer{}
                }
-               this.Logs[et].Write([]byte(parameters["properties"].(map[string]string)["text"]))
+               this.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
        }
 
        if resourceType == "collections" && output != nil {
-               mt := parameters["manifest_text"].(string)
-               outmap := output.(map[string]string)
-               outmap["portable_data_hash"] = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+               mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+               outmap := output.(*CollectionRecord)
+               outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
        }
 
        return nil
@@ -161,9 +156,9 @@ func (this *ArvTestClient) Create(resourceType string,
 func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
        if resourceType == "collections" {
                if uuid == hwPDH {
-                       output.(*Collection).ManifestText = hwManifest
+                       output.(*CollectionRecord).ManifestText = hwManifest
                } else if uuid == otherPDH {
-                       output.(*Collection).ManifestText = otherManifest
+                       output.(*CollectionRecord).ManifestText = otherManifest
                }
        }
        if resourceType == "containers" {
@@ -176,7 +171,7 @@ func (this *ArvTestClient) Update(resourceType string, uuid string, parameters a
 
        this.Content = parameters
        if resourceType == "containers" {
-               if parameters["state"] == "Running" {
+               if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
                        this.WasSetRunning = true
                }
 
@@ -210,7 +205,7 @@ func (this *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename str
 func (s *TestSuite) TestLoadImage(c *C) {
        kc := &KeepTestClient{}
        docker := NewTestDockerClient()
-       cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
+       cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        _, err := cr.Docker.RemoveImage(hwImageId, true)
 
@@ -266,7 +261,7 @@ func (this ArvErrorTestClient) Update(resourceType string, uuid string, paramete
 }
 
 func (this KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
-       return "", 0, nil
+       return "", 0, errors.New("KeepError")
 }
 
 func (this KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
@@ -297,36 +292,36 @@ func (this KeepReadErrorTestClient) ManifestFileReader(m manifest.Manifest, file
 
 func (s *TestSuite) TestLoadImageArvError(c *C) {
        // (1) Arvados error
-       cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil)
+       cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.ContainerRecord.ContainerImage = hwPDH
 
        err := cr.LoadImage()
-       c.Check(err.Error(), Equals, "ArvError")
+       c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
 }
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
        docker := NewTestDockerClient()
-       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker)
+       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.ContainerRecord.ContainerImage = hwPDH
 
        err := cr.LoadImage()
-       c.Check(err.Error(), Equals, "KeepError")
+       c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
 }
 
 func (s *TestSuite) TestLoadImageCollectionError(c *C) {
        // (3) Collection doesn't contain image
-       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil)
+       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.ContainerRecord.ContainerImage = otherPDH
 
        err := cr.LoadImage()
-       c.Check(err.Error(), Equals, "First file in the collection does not end in .tar")
+       c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
 }
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
        docker := NewTestDockerClient()
-       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker)
+       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.ContainerRecord.ContainerImage = hwPDH
 
        err := cr.LoadImage()
@@ -356,15 +351,23 @@ func (this *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
        return nil
 }
 
+func dockerLog(fd byte, msg string) []byte {
+       by := []byte(msg)
+       header := make([]byte, 8+len(by))
+       header[0] = fd
+       header[7] = byte(len(by))
+       copy(header[8:], by)
+       return header
+}
+
 func (s *TestSuite) TestRunContainer(c *C) {
        docker := NewTestDockerClient()
        docker.fn = func(t *TestDockerClient) {
-               t.stdoutWriter.Write([]byte("Hello world\n"))
-               t.stdoutWriter.Close()
-               t.stderrWriter.Close()
+               t.logWriter.Write(dockerLog(1, "Hello world\n"))
+               t.logWriter.Close()
                t.finish <- dockerclient.WaitResult{}
        }
-       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
+       cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        var logs TestLogs
        cr.NewLogWriter = logs.NewTestLoggingWriter
@@ -376,9 +379,6 @@ func (s *TestSuite) TestRunContainer(c *C) {
        err = cr.StartContainer()
        c.Check(err, IsNil)
 
-       err = cr.AttachLogs()
-       c.Check(err, IsNil)
-
        err = cr.WaitFinish()
        c.Check(err, IsNil)
 
@@ -389,8 +389,7 @@ func (s *TestSuite) TestRunContainer(c *C) {
 func (s *TestSuite) TestCommitLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
-       cr := NewContainerRunner(api, kc, nil)
-       cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
        cr.CrunchLog.Print("Hello world!")
@@ -400,28 +399,26 @@ func (s *TestSuite) TestCommitLogs(c *C) {
        err := cr.CommitLogs()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-       c.Check(api.Content["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
+       c.Check(api.Content["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Check(api.Content["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
        c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
 }
 
 func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
-       cr := NewContainerRunner(api, kc, nil)
-       cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        err := cr.UpdateContainerRecordRunning()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["state"], Equals, "Running")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Running")
 }
 
 func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
-       cr := NewContainerRunner(api, kc, nil)
-       cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        cr.LogsPDH = new(string)
        *cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
@@ -433,25 +430,24 @@ func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
        err := cr.UpdateContainerRecordComplete()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["log"], Equals, *cr.LogsPDH)
-       c.Check(api.Content["exit_code"], Equals, *cr.ExitCode)
-       c.Check(api.Content["state"], Equals, "Complete")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, *cr.ExitCode)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 }
 
 func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
-       cr := NewContainerRunner(api, kc, nil)
-       cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        cr.Cancelled = true
        cr.finalState = "Cancelled"
 
        err := cr.UpdateContainerRecordComplete()
        c.Check(err, IsNil)
 
-       c.Check(api.Content["log"], IsNil)
-       c.Check(api.Content["exit_code"], IsNil)
-       c.Check(api.Content["state"], Equals, "Cancelled")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], IsNil)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], IsNil)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
 }
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
@@ -466,13 +462,13 @@ func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvT
        docker.RemoveImage(hwImageId, true)
 
        api = &ArvTestClient{ContainerRecord: rec}
-       cr = NewContainerRunner(api, &KeepTestClient{}, docker)
+       cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
-       err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       err = cr.Run()
        c.Check(err, IsNil)
        c.Check(api.WasSetRunning, Equals, true)
 
-       c.Check(api.Content["log"], NotNil)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
 
        if err != nil {
                for k, v := range api.Logs {
@@ -490,19 +486,18 @@ func (s *TestSuite) TestFullRunHello(c *C) {
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
     "environment": {},
-    "mounts": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-               t.stdoutWriter.Write([]byte("hello world\n"))
-               t.stdoutWriter.Close()
-               t.stderrWriter.Close()
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
                t.finish <- dockerclient.WaitResult{}
        })
 
-       c.Check(api.Content["exit_code"], Equals, 0)
-       c.Check(api.Content["state"], Equals, "Complete")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
 
@@ -514,21 +509,20 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
     "environment": {},
-    "mounts": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-               t.stdoutWriter.Write([]byte("hello\n"))
-               t.stderrWriter.Write([]byte("world\n"))
-               t.stdoutWriter.Close()
-               t.stderrWriter.Close()
+               t.logWriter.Write(dockerLog(1, "hello\n"))
+               t.logWriter.Write(dockerLog(2, "world\n"))
+               t.logWriter.Close()
                t.finish <- dockerclient.WaitResult{ExitCode: 1}
        })
 
-       c.Check(api.Content["log"], NotNil)
-       c.Check(api.Content["exit_code"], Equals, 1)
-       c.Check(api.Content["state"], Equals, "Complete")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
        c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
@@ -540,19 +534,20 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
     "environment": {},
-    "mounts": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-               t.stdoutWriter.Write([]byte(t.cwd + "\n"))
-               t.stdoutWriter.Close()
-               t.stderrWriter.Close()
+               t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+               t.logWriter.Close()
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Content["exit_code"], Equals, 0)
-       c.Check(api.Content["state"], Equals, "Complete")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+
+       log.Print(api.Logs["stdout"].String())
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
 }
@@ -563,19 +558,18 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
     "environment": {},
-    "mounts": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-               t.stdoutWriter.Write([]byte(t.cwd + "\n"))
-               t.stdoutWriter.Close()
-               t.stderrWriter.Close()
+               t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+               t.logWriter.Close()
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Content["exit_code"], Equals, 0)
-       c.Check(api.Content["state"], Equals, "Complete")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
 }
@@ -586,7 +580,7 @@ func (s *TestSuite) TestCancel(c *C) {
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": ".",
     "environment": {},
-    "mounts": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
@@ -599,15 +593,14 @@ func (s *TestSuite) TestCancel(c *C) {
        docker := NewTestDockerClient()
        docker.fn = func(t *TestDockerClient) {
                <-t.stop
-               t.stdoutWriter.Write([]byte("foo\n"))
-               t.stdoutWriter.Close()
-               t.stderrWriter.Close()
+               t.logWriter.Write(dockerLog(1, "foo\n"))
+               t.logWriter.Close()
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        }
        docker.RemoveImage(hwImageId, true)
 
        api := &ArvTestClient{ContainerRecord: rec}
-       cr := NewContainerRunner(api, &KeepTestClient{}, docker)
+       cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
 
        go func() {
                for cr.ContainerID == "" {
@@ -616,11 +609,11 @@ func (s *TestSuite) TestCancel(c *C) {
                cr.SigChan <- syscall.SIGINT
        }()
 
-       err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       err = cr.Run()
 
        c.Check(err, IsNil)
 
-       c.Check(api.Content["log"], NotNil)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
 
        if err != nil {
                for k, v := range api.Logs {
@@ -629,7 +622,7 @@ func (s *TestSuite) TestCancel(c *C) {
                }
        }
 
-       c.Check(api.Content["state"], Equals, "Cancelled")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
 
@@ -641,19 +634,90 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
     "cwd": "/bin",
     "environment": {"FROBIZ": "bilbo"},
-    "mounts": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
     "runtime_constraints": {}
 }`, func(t *TestDockerClient) {
-               t.stdoutWriter.Write([]byte(t.env[0][7:] + "\n"))
-               t.stdoutWriter.Close()
-               t.stderrWriter.Close()
+               t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+               t.logWriter.Close()
                t.finish <- dockerclient.WaitResult{ExitCode: 0}
        })
 
-       c.Check(api.Content["exit_code"], Equals, 0)
-       c.Check(api.Content["state"], Equals, "Complete")
+       c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+       c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
 
        c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
 }
+
+type ArvMountCmdLine struct {
+       Cmd []string
+}
+
+func (am *ArvMountCmdLine) ArvMountTest(c []string) (*exec.Cmd, error) {
+       am.Cmd = c
+       return nil, nil
+}
+
+func (s *TestSuite) TestSetupMounts(c *C) {
+       api := &ArvTestClient{}
+       kc := &KeepTestClient{}
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       am := &ArvMountCmdLine{}
+       cr.RunArvMount = am.ArvMountTest
+
+       i := 0
+       cr.MkTempDir = func(string, string) (string, error) {
+               i += 1
+               d := fmt.Sprintf("/tmp/mktmpdir%d", i)
+               os.Mkdir(d, os.ModePerm)
+               return d, nil
+       }
+
+       {
+               cr.ContainerRecord.Mounts = make(map[string]Mount)
+               cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
+               cr.OutputPath = "/tmp"
+
+               err := cr.SetupMounts()
+               c.Check(err, IsNil)
+               c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+               c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir2:/tmp"})
+               cr.CleanupDirs()
+       }
+
+       {
+               i = 0
+               cr.ContainerRecord.Mounts = make(map[string]Mount)
+               cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
+               cr.OutputPath = "/keeptmp"
+
+               os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+
+               err := cr.SetupMounts()
+               c.Check(err, IsNil)
+               c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+               c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/tmp0:/keeptmp"})
+               cr.CleanupDirs()
+       }
+
+       {
+               i = 0
+               cr.ContainerRecord.Mounts = make(map[string]Mount)
+               cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
+               cr.ContainerRecord.Mounts["/keepout"] = Mount{Kind: "collection", Writable: true}
+               cr.OutputPath = "/keepout"
+
+               os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+               os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+
+               err := cr.SetupMounts()
+               c.Check(err, IsNil)
+               c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+               var ss sort.StringSlice = cr.Binds
+               ss.Sort()
+               c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
+                       "/tmp/mktmpdir1/tmp0:/keepout"})
+               cr.CleanupDirs()
+       }
+}
index 9d97384109597d9b0e7e56ad96657c1171fafbf5..20928dbef769b0d4dd419ec0f8693541c93ba369 100644 (file)
@@ -35,6 +35,7 @@ type ThrottledLogger struct {
        stop        bool
        flusherDone chan bool
        Timestamper
+       Immediate *log.Logger
 }
 
 // RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
@@ -59,6 +60,9 @@ func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
        sc := bufio.NewScanner(bytes.NewBuffer(p))
        for sc.Scan() {
                _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
+               if tl.Immediate != nil {
+                       tl.Immediate.Printf("%s %s\n", now, sc.Text())
+               }
        }
        return len(p), err
 }
@@ -180,9 +184,10 @@ func (arvlog *ArvLogWriter) Write(p []byte) (n int, err error) {
        }
 
        // write to API
-       lr := arvadosclient.Dict{"object_uuid": arvlog.UUID,
-               "event_type": arvlog.loggingStream,
-               "properties": map[string]string{"text": string(p)}}
+       lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+               "object_uuid": arvlog.UUID,
+               "event_type":  arvlog.loggingStream,
+               "properties":  map[string]string{"text": string(p)}}}
        err2 := arvlog.ArvClient.Create("logs", lr, nil)
 
        if err1 != nil || err2 != nil {
index d8fdaa31650c3e5b51930bfdb32d520cb89b86b5..bce324d478571aefe0dddf0a0647199a3f0fc1e4 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "fmt"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        . "gopkg.in/check.v1"
        "time"
 )
@@ -23,7 +24,7 @@ var _ = Suite(&LoggingTestSuite{})
 func (s *LoggingTestSuite) TestWriteLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
-       cr := NewContainerRunner(api, kc, nil)
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
        cr.CrunchLog.Print("Hello world!")
@@ -39,16 +40,17 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
        logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
                "2015-12-29T15:51:45.000000002Z Goodbye\n"
 
-       c.Check(api.Content["event_type"], Equals, "crunch-run")
-       c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext)
+       c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+       c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
        c.Check(string(kc.Content), Equals, logtext)
 }
 
 func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
-       cr := NewContainerRunner(api, kc, nil)
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+       cr.CrunchLog.Immediate = nil
 
        for i := 0; i < 2000000; i += 1 {
                cr.CrunchLog.Printf("Hello %d", i)
@@ -67,7 +69,7 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
 func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
-       cr := NewContainerRunner(api, kc, nil)
+       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
        ts := &TestTimestamper{}
        cr.CrunchLog.Timestamper = ts.Timestamp
        stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
@@ -81,14 +83,14 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
        cr.CrunchLog.Close()
        logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
                "2015-12-29T15:51:45.000000003Z Goodbye\n"
-       c.Check(api.Content["event_type"], Equals, "crunch-run")
-       c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext1)
+       c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+       c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext1)
 
        stdout.Close()
        logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
                "2015-12-29T15:51:45.000000004Z Blurb\n"
-       c.Check(api.Content["event_type"], Equals, "stdout")
-       c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext2)
+       c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "stdout")
+       c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext2)
 
        mt, err := cr.LogCollection.ManifestText()
        c.Check(err, IsNil)
index 4a2693a6788f473adbe90570b7faeffe7357b222..23b7c38e23c3cf2d83264e479c98acbf9d130aa6 100644 (file)
@@ -17,7 +17,11 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "io"
+       "log"
+       "os"
+       "path/filepath"
        "strings"
+       "sync"
 )
 
 // Block is a data block in a manifest stream
@@ -77,6 +81,12 @@ func (m *CollectionFileWriter) Close() error {
        return nil
 }
 
+func (m *CollectionFileWriter) NewFile(fn string) {
+       m.offset += m.length
+       m.length = 0
+       m.fn = fn
+}
+
 func (m *CollectionFileWriter) goUpload() {
        var errors []error
        uploader := m.uploader
@@ -98,6 +108,7 @@ func (m *CollectionFileWriter) goUpload() {
 type CollectionWriter struct {
        IKeepClient
        Streams []*CollectionFileWriter
+       mtx     sync.Mutex
 }
 
 // Open a new file for writing in the Keep collection.
@@ -125,6 +136,8 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
                fn}
        go fw.goUpload()
 
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
        m.Streams = append(m.Streams, fw)
 
        return fw
@@ -133,6 +146,9 @@ func (m *CollectionWriter) Open(path string) io.WriteCloser {
 // Finish writing the collection, wait for all blocks to complete uploading.
 func (m *CollectionWriter) Finish() error {
        var errstring string
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
+
        for _, stream := range m.Streams {
                if stream.uploader == nil {
                        continue
@@ -168,7 +184,12 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
 
        var buf bytes.Buffer
 
+       m.mtx.Lock()
+       defer m.mtx.Unlock()
        for _, v := range m.Streams {
+               if len(v.FileStreamSegments) == 0 {
+                       continue
+               }
                k := v.StreamName
                if k == "." {
                        buf.WriteString(".")
@@ -177,9 +198,13 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
                        k = strings.Replace(k, "\n", "", -1)
                        buf.WriteString("./" + k)
                }
-               for _, b := range v.Blocks {
-                       buf.WriteString(" ")
-                       buf.WriteString(b)
+               if len(v.Blocks) > 0 {
+                       for _, b := range v.Blocks {
+                               buf.WriteString(" ")
+                               buf.WriteString(b)
+                       }
+               } else {
+                       buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
                }
                for _, f := range v.FileStreamSegments {
                        buf.WriteString(" ")
@@ -191,3 +216,83 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
        }
        return buf.String(), nil
 }
+
+type WalkUpload struct {
+       kc          IKeepClient
+       stripPrefix string
+       streamMap   map[string]*CollectionFileWriter
+       status      *log.Logger
+}
+
+// WalkFunc walks a directory tree, uploads each file found and adds it to the
+// CollectionWriter.
+func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+
+       if info.IsDir() {
+               return nil
+       }
+
+       var dir string
+       if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+               dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       }
+       if dir == "" {
+               dir = "."
+       }
+
+       fn := path[(len(path) - len(info.Name())):]
+
+       if m.streamMap[dir] == nil {
+               m.streamMap[dir] = &CollectionFileWriter{
+                       m.kc,
+                       &manifest.ManifestStream{StreamName: dir},
+                       0,
+                       0,
+                       nil,
+                       make(chan *Block),
+                       make(chan []error),
+                       ""}
+               go m.streamMap[dir].goUpload()
+       }
+
+       fileWriter := m.streamMap[dir]
+
+       // Reset the CollectionFileWriter for a new file
+       fileWriter.NewFile(fn)
+
+       file, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       defer file.Close()
+
+       m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+       _, err = io.Copy(fileWriter, file)
+       if err != nil {
+               return err
+       }
+
+       // Commits the current file.  Legal to call this repeatedly.
+       fileWriter.Close()
+
+       return nil
+}
+
+func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+       streamMap := make(map[string]*CollectionFileWriter)
+       wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+       err = filepath.Walk(root, wu.WalkFunc)
+
+       if err != nil {
+               return "", err
+       }
+
+       cw.mtx.Lock()
+       for _, st := range streamMap {
+               cw.Streams = append(cw.Streams, st)
+       }
+       cw.mtx.Unlock()
+
+       return cw.ManifestText()
+}
diff --git a/services/crunch-run/upload_test.go b/services/crunch-run/upload_test.go
new file mode 100644 (file)
index 0000000..b4b1efd
--- /dev/null
@@ -0,0 +1,145 @@
+package main
+
+import (
+       . "gopkg.in/check.v1"
+       "io/ioutil"
+       "log"
+       "os"
+       "sync"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       os.Mkdir(tmpdir+"/subdir", 0700)
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+       ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+       c.Check(err, IsNil)
+
+       // streams can get added in either order because of scheduling
+       // of goroutines.
+       if str != `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+` && str != `./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+` {
+               c.Error("Did not get expected manifest text")
+       }
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       file, _ := os.Create(tmpdir + "/" + "file1.txt")
+       data := make([]byte, 1024*1024-1)
+       for i := range data {
+               data[i] = byte(i % 10)
+       }
+       for i := 0; i < 65; i++ {
+               file.Write(data)
+       }
+       file.Close()
+
+       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+       c.Check(err, IsNil)
+       c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       os.Mkdir(tmpdir+"/subdir", 0700)
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+       cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+       c.Check(err, IsNil)
+       c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+       tmpdir, _ := ioutil.TempDir("", "")
+       defer func() {
+               os.RemoveAll(tmpdir)
+       }()
+
+       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+       cw := CollectionWriter{&KeepErrorTestClient{}, nil, sync.Mutex{}}
+       str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+       c.Check(err, NotNil)
+       c.Check(str, Equals, "")
+}