8015: Crunch run mounting and output upload WIP
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 12 Feb 2016 05:04:20 +0000 (00:04 -0500)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 12 Feb 2016 05:04:20 +0000 (00:04 -0500)
services/crunch-run/crunchrun.go
services/crunch-run/upload.go

index 640ac88ca9a0aa70c1cc305012ab3bb18962ae6e..1248854decb9f6df0fa5b5a39b4292a38a87d10f 100644 (file)
@@ -8,8 +8,10 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/manifest"
        "github.com/curoverse/dockerclient"
        "io"
+       "ioutil"
        "log"
        "os"
+       "os/exec"
        "os/signal"
        "strings"
        "sync"
@@ -33,7 +35,13 @@ type IKeepClient interface {
 }
 
 // Mount describes the mount points to create inside the container.
-type Mount struct{}
+type Mount struct {
+       Kind             string `json:"kind"`
+       Writable         bool   `json:"writable"`
+       PortableDataHash string `json:"portable_data_hash"`
+       UUID             string `json:"uuid"`
+       DeviceType       string `json:"device_type"`
+}
 
 // Collection record returned by the API server.
 type Collection struct {
@@ -86,6 +94,8 @@ type ContainerRunner struct {
        Stderr        *ThrottledLogger
        LogCollection *CollectionWriter
        LogsPDH       *string
+       ArvMount      *exec.Cmd
+       ArvMountPoint string
        CancelLock    sync.Mutex
        Cancelled     bool
        SigChan       chan os.Signal
@@ -163,6 +173,61 @@ func (runner *ContainerRunner) LoadImage() (err error) {
        return nil
 }
 
+func (runner *ContainerRunner) SetupMounts(hostConfig *dockerclient.HostConfig) (err error) {
+       runner.ArvMountPoint = ioutil.TempDir("", "keep")
+       pdhOnly := true
+       tmpcount := 0
+       arvMountCmd := []string{"--foreground"}
+
+       for bind, mnt := range runner.ContainerRecord.Mounts {
+               if mnt.Kind == "collection" {
+                       var src string
+                       if mnt.UUID != "" && mnt.PortableDataHash != "" {
+                               return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection")
+                       }
+                       if mnt.UUID != "" {
+                               if mnt.Writable {
+                                       return fmt.Errorf("Writing to collection currently not permitted.")
+                               }
+                               pdhOnly = false
+                               src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.UUID)
+                       } else if mnt.PortableDataHash != "" {
+                               if mnt.Writable {
+                                       return fmt.Errorf("Can never write to a collection specified by portable data hash")
+                               }
+                               src = fmt.Sprintf("%s/by_id/%s", arvMountPoint, mnt.PortableDataHash)
+                       } else {
+                               src = fmt.Sprintf("%s/tmp%i", arvMountPoint, tmpcount)
+                               arvMountCmd = append(arvMountCmd, "--mount-tmp")
+                               arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%i", tmpcount))
+                               tmpcount += 1
+                       }
+                       if mnt.Writable {
+                               hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s", src, bind))
+                       } else {
+                               hostConfig.Binds = append(hostConfig.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+                       }
+               } else if mnt.Kind == "tmp" {
+                       hostConfig.Binds = append(hostConfig.Binds, bind)
+               } else {
+                       return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
+               }
+       }
+
+       if pdhOnly {
+               arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+       } else {
+               arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
+       }
+       arvMountCmd = append(arvMountCmd, arvMountPoint)
+
+       runner.ArvMount = exec.Command("arv-mount", arvMountCmd)
+       err = runner.ArvMount.Start()
+       if err != nil {
+               return err
+       }
+}
+
 // StartContainer creates the container and runs it.
 func (runner *ContainerRunner) StartContainer() (err error) {
        runner.CrunchLog.Print("Creating Docker container")
@@ -187,6 +252,11 @@ func (runner *ContainerRunner) StartContainer() (err error) {
        }
        hostConfig := &dockerclient.HostConfig{}
 
+       err = runner.SetupMounts(hostConfig)
+       if err != nil {
+               return
+       }
+
        runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
        err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
        if err != nil {
@@ -239,6 +309,9 @@ func (runner *ContainerRunner) WaitFinish() error {
        runner.Stdout.Close()
        runner.Stderr.Close()
 
+       umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+       umount.Run()
+
        return nil
 }
 
index 4a2693a6788f473adbe90570b7faeffe7357b222..d50ea581b3110b1eb129d8e7a242d8b73645bb87 100644 (file)
@@ -191,3 +191,74 @@ func (m *CollectionWriter) ManifestText() (mt string, err error) {
        }
        return buf.String(), nil
 }
+
+func (m *CollectionWriter) WalkFunc(path string, info os.FileInfo, err error, stripPrefix string) error {
+       if info.IsDir() {
+               return nil
+       }
+
+       var dir string
+       if len(path) > (len(stripPrefix) + len(info.Name()) + 1) {
+               dir = path[len(stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+       }
+       if dir == "" {
+               dir = "."
+       }
+
+       fn := path[(len(path) - len(info.Name())):]
+
+       if m.Streams[dir] == nil {
+               m.Streams[dir] = &CollectionFileWriter{
+                       m.IKeepClient,
+                       &manifest.ManifestStream{StreamName: dir},
+                       0,
+                       0,
+                       nil,
+                       make(chan *Block),
+                       make(chan []error),
+                       ""}
+               go m.Streams[dir].goUpload()
+       }
+
+       stream := m.Streams[dir]
+
+       fileStart := stream.offset
+
+       file, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+
+       log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+       var count int64
+       count, err = io.Copy(stream, file)
+       if err != nil {
+               return err
+       }
+
+       stream.offset += count
+
+       stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
+               manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
+
+       return nil
+}
+
+func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
+       mw := CollectionWriter{kc, root, map[string]*ManifestStreamWriter{}}
+       err = filepath.Walk(root, func(path string, info os.FileInfo, err error) {
+               return mw.WalkFunc(path, info, err, root)
+       })
+
+       if err != nil {
+               return "", err
+       }
+
+       err = mw.Finish()
+       if err != nil {
+               return "", err
+       }
+
+       return mw.ManifestText(), nil
+}