9397: if mount.Path ends with "/", trim it.
[arvados.git] / services / crunch-run / crunchrun.go
index 971cb3a27a246c9fbe1a325496a6da63e6e69ac4..bd4c5cc5b4551337f09014858276feaaea2185a6 100644 (file)
@@ -19,6 +19,7 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "sort"
        "strings"
        "sync"
        "syscall"
@@ -40,7 +41,7 @@ var ErrCancelled = errors.New("Cancelled")
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
-       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -239,8 +240,17 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        return c, nil
 }
 
+var tmpBackedOutputDir = false
+
+func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
+       if runner.ArvMountPoint == "" {
+               runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+       }
+       return
+}
+
 func (runner *ContainerRunner) SetupMounts() (err error) {
-       runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+       err = runner.SetupArvMountPoint("keep")
        if err != nil {
                return fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
@@ -259,7 +269,14 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
        runner.Binds = nil
        needCertMount := true
 
-       for bind, mnt := range runner.Container.Mounts {
+       var binds []string
+       for bind, _ := range runner.Container.Mounts {
+               binds = append(binds, bind)
+       }
+       sort.Strings(binds)
+
+       for _, bind := range binds {
+               mnt := runner.Container.Mounts[bind]
                if bind == "stdout" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
@@ -275,10 +292,17 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
                        }
                }
+
                if bind == "/etc/arvados/ca-certificates.crt" {
                        needCertMount = false
                }
 
+               if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
+                       if mnt.Kind != "collection" {
+                               return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
+                       }
+               }
+
                switch {
                case mnt.Kind == "collection":
                        var src string
@@ -305,6 +329,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
+                               } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                                       return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
                                }
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                        } else {
@@ -327,6 +353,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                        runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
+                       tmpBackedOutputDir = true
 
                case mnt.Kind == "tmp":
                        runner.Binds = append(runner.Binds, bind)
@@ -633,6 +660,57 @@ func (runner *ContainerRunner) CaptureOutput() error {
                manifestText = rec.ManifestText
        }
 
+       // Pre-populate output from the configured mount points
+       var binds []string
+       for bind, _ := range runner.Container.Mounts {
+               binds = append(binds, bind)
+       }
+       sort.Strings(binds)
+
+       for _, bind := range binds {
+               mnt := runner.Container.Mounts[bind]
+
+               bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
+
+               if bindSuffix == bind || len(bindSuffix) <= 0 {
+                       // either does not start with OutputPath or is OutputPath itself
+                       continue
+               }
+
+               if strings.Index(bindSuffix, "/") != 0 {
+                       return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind)
+               }
+
+               jsondata, err := json.Marshal(mnt.Content)
+               if err != nil {
+                       return fmt.Errorf("While marshal of mount content: %v", err)
+               }
+               var content map[string]interface{}
+               err = json.Unmarshal(jsondata, &content)
+               if err != nil {
+                       return fmt.Errorf("While unmarshal of mount content: %v", err)
+               }
+
+               if content["exclude_from_output"] == true {
+                       continue
+               }
+
+               idx := strings.Index(mnt.PortableDataHash, "/")
+               if idx > 0 {
+                       mnt.Path = mnt.PortableDataHash[idx:]
+                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
+               }
+
+               // append to manifest_text
+               m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
+               if err != nil {
+                       return err
+               }
+
+               manifestText = manifestText + m
+       }
+
+       // Save output
        var response arvados.Collection
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
@@ -648,6 +726,90 @@ func (runner *ContainerRunner) CaptureOutput() error {
        return nil
 }
 
+// Fetch the collection for the mnt.PortableDataHash
+// Return the manifest_text fragment corresponding to the specified mnt.Path
+//  after making any required updates.
+//  Ex:
+//    If mnt.Path is not specified,
+//      return the entire manifest_text after replacing any "." with bindSuffix
+//    If mnt.Path corresponds to one stream,
+//      return the manifest_text for that stream after replacing that stream name with bindSuffix
+//    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
+//      for that stream after replacing stream name with bindSuffix minus the last word
+//      and the file name with last word of the bindSuffix
+//  Allowed path examples:
+//    "path":"/"
+//    "path":"/subdir1"
+//    "path":"/subdir1/subdir2"
+//    "path":"/subdir/filename" etc
+func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
+       var collection arvados.Collection
+       err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+       if err != nil {
+               return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
+       }
+
+       manifestText := ""
+       if mnt.Path == "" || mnt.Path == "/" {
+               // no path specified; return the entire manifest text
+               manifestText = collection.ManifestText
+               manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
+               manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
+       } else {
+               // either a single stream or file from a stream is being sought
+               bindIdx := strings.LastIndex(bindSuffix, "/")
+               var bindSubdir, bindFileName string
+               if bindIdx >= 0 {
+                       bindSubdir = "." + bindSuffix[0:bindIdx]
+                       bindFileName = bindSuffix[bindIdx+1:]
+               }
+               mntPath := mnt.Path
+               if strings.HasSuffix(mntPath, "/") {
+                       mntPath = mntPath[0 : len(mntPath)-1]
+               }
+               pathIdx := strings.LastIndex(mntPath, "/")
+               var pathSubdir, pathFileName string
+               if pathIdx >= 0 {
+                       pathSubdir = "." + mntPath[0:pathIdx]
+                       pathFileName = mntPath[pathIdx+1:]
+               }
+               streams := strings.Split(collection.ManifestText, "\n")
+               for _, stream := range streams {
+                       tokens := strings.Split(stream, " ")
+                       if tokens[0] == "."+mntPath {
+                               // path refers to this complete stream
+                               adjustedStream := strings.Replace(stream, "."+mntPath, "."+bindSuffix, -1)
+                               manifestText = adjustedStream + "\n"
+                               break
+                       } else {
+                               // look for a matching file in this stream
+                               if tokens[0] == pathSubdir {
+                                       // path refers to a file in this stream
+                                       for _, token := range tokens {
+                                               if strings.Index(token, ":"+pathFileName) > 0 {
+                                                       // found the file in the stream; discard all other file tokens
+                                                       for _, t := range tokens {
+                                                               if strings.Index(t, ":") == -1 {
+                                                                       manifestText += (" " + t)
+                                                               } else {
+                                                                       break // done reading all non-file tokens
+                                                               }
+                                                       }
+                                                       manifestText = strings.Trim(manifestText, " ")
+                                                       token = strings.Replace(token, ":"+pathFileName, ":"+bindFileName, -1)
+                                                       manifestText += (" " + token + "\n")
+                                                       manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
+                                                       break
+                                               }
+                                       }
+                               }
+                       }
+               }
+       }
+
+       return manifestText, nil
+}
+
 func (runner *ContainerRunner) loadDiscoveryVars() {
        tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
        if err != nil {