9397: Use ManifestTextForPath
[arvados.git] / services / crunch-run / crunchrun.go
index ade40c6b03a4d4a98812172aab31da5173453c4e..f73ec734662972103e7ffba4f96747fb8dec58c4 100644 (file)
@@ -19,6 +19,7 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "sort"
        "strings"
        "sync"
        "syscall"
@@ -40,7 +41,7 @@ var ErrCancelled = errors.New("Cancelled")
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
-       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -239,8 +240,15 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        return c, nil
 }
 
+func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
+       if runner.ArvMountPoint == "" {
+               runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+       }
+       return
+}
+
 func (runner *ContainerRunner) SetupMounts() (err error) {
-       runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+       err = runner.SetupArvMountPoint("keep")
        if err != nil {
                return fmt.Errorf("While creating keep mount temp dir: %v", err)
        }
@@ -257,8 +265,16 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
 
        collectionPaths := []string{}
        runner.Binds = nil
+       needCertMount := true
+
+       var binds []string
+       for bind, _ := range runner.Container.Mounts {
+               binds = append(binds, bind)
+       }
+       sort.Strings(binds)
 
-       for bind, mnt := range runner.Container.Mounts {
+       for _, bind := range binds {
+               mnt := runner.Container.Mounts[bind]
                if bind == "stdout" {
                        // Is it a "file" mount kind?
                        if mnt.Kind != "file" {
@@ -275,6 +291,16 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                }
 
+               if bind == "/etc/arvados/ca-certificates.crt" {
+                       needCertMount = false
+               }
+
+               if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
+                       if mnt.Kind != "collection" {
+                               return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
+                       }
+               }
+
                switch {
                case mnt.Kind == "collection":
                        var src string
@@ -301,6 +327,8 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        if mnt.Writable {
                                if bind == runner.Container.OutputPath {
                                        runner.HostOutputDir = src
+                               } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+                                       return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
                                }
                                runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
                        } else {
@@ -355,6 +383,16 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                return fmt.Errorf("Output path does not correspond to a writable mount point")
        }
 
+       if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
+               for _, certfile := range arvadosclient.CertFiles {
+                       _, err := os.Stat(certfile)
+                       if err == nil {
+                               runner.Binds = append(runner.Binds, fmt.Sprintf("%s:/etc/arvados/ca-certificates.crt:ro", certfile))
+                               break
+                       }
+               }
+       }
+
        if pdhOnly {
                arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
        } else {
@@ -619,11 +657,53 @@ func (runner *ContainerRunner) CaptureOutput() error {
                manifestText = rec.ManifestText
        }
 
+       // Pre-populate output from the configured mount points
+       var binds []string
+       for bind, _ := range runner.Container.Mounts {
+               binds = append(binds, bind)
+       }
+       sort.Strings(binds)
+
+       for _, bind := range binds {
+               mnt := runner.Container.Mounts[bind]
+
+               bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
+
+               if bindSuffix == bind || len(bindSuffix) <= 0 {
+                       // either does not start with OutputPath or is OutputPath itself
+                       continue
+               }
+
+               if strings.HasPrefix(bindSuffix, "/") == false {
+                       bindSuffix = "/" + bindSuffix
+               }
+
+               if mnt.ExcludeFromOutput == true {
+                       continue
+               }
+
+               idx := strings.Index(mnt.PortableDataHash, "/")
+               if idx > 0 {
+                       mnt.Path = mnt.PortableDataHash[idx:]
+                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
+               }
+
+               // append to manifest_text
+               m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
+               if err != nil {
+                       return err
+               }
+
+               manifestText = manifestText + m
+       }
+
+       // Save output
        var response arvados.Collection
+       manifestText := manifest.Manifest{Text: manifestText}.NormalizedText()
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
-                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
                                "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
@@ -634,6 +714,44 @@ func (runner *ContainerRunner) CaptureOutput() error {
        return nil
 }
 
+var outputCollections = make(map[string]arvados.Collection)
+
+// Fetch the collection for the mnt.PortableDataHash
+// Return the manifest_text fragment corresponding to the specified mnt.Path
+//  after making any required updates.
+//  Ex:
+//    If mnt.Path is not specified,
+//      return the entire manifest_text after replacing any "." with bindSuffix
+//    If mnt.Path corresponds to one stream,
+//      return the manifest_text for that stream after replacing that stream name with bindSuffix
+//    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
+//      for that stream after replacing stream name with bindSuffix minus the last word
+//      and the file name with last word of the bindSuffix
+//  Allowed path examples:
+//    "path":"/"
+//    "path":"/subdir1"
+//    "path":"/subdir1/subdir2"
+//    "path":"/subdir/filename" etc
+func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
+       collection := outputCollections[mnt.PortableDataHash]
+       if collection.PortableDataHash == "" {
+               err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+               if err != nil {
+                       return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
+               }
+               outputCollections[mnt.PortableDataHash] = collection
+       }
+
+       if collection.ManifestText == "" {
+               runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
+               return "", nil
+       }
+
+       manifest := manifest.Manifest{Text: collection.ManifestText}
+       manifestText := manifest.ManifestTextForPath(mnt.Path, bindSuffix)
+       return manifestText, nil
+}
+
 func (runner *ContainerRunner) loadDiscoveryVars() {
        tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
        if err != nil {
@@ -694,7 +812,7 @@ func (runner *ContainerRunner) CommitLogs() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
-                               "expires_at":    time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
@@ -737,10 +855,10 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
 func (runner *ContainerRunner) UpdateContainerFinal() error {
        update := arvadosclient.Dict{}
        update["state"] = runner.finalState
+       if runner.LogsPDH != nil {
+               update["log"] = *runner.LogsPDH
+       }
        if runner.finalState == "Complete" {
-               if runner.LogsPDH != nil {
-                       update["log"] = *runner.LogsPDH
-               }
                if runner.ExitCode != nil {
                        update["exit_code"] = *runner.ExitCode
                }
@@ -800,6 +918,7 @@ func (runner *ContainerRunner) Run() (err error) {
                checkErr(err)
 
                if runner.finalState == "Queued" {
+                       runner.CrunchLog.Close()
                        runner.UpdateContainerFinal()
                        return
                }
@@ -832,6 +951,7 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
+               runner.finalState = "Cancelled"
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -839,6 +959,7 @@ func (runner *ContainerRunner) Run() (err error) {
        // set up FUSE mount and binds
        err = runner.SetupMounts()
        if err != nil {
+               runner.finalState = "Cancelled"
                err = fmt.Errorf("While setting up mounts: %v", err)
                return
        }
@@ -895,10 +1016,15 @@ func main() {
        cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
        cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
        cgroupParentSubsystem := flag.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+       caCertsPath := flag.String("ca-certs", "", "Path to TLS root certificates")
        flag.Parse()
 
        containerId := flag.Arg(0)
 
+       if *caCertsPath != "" {
+               arvadosclient.CertFiles = []string{*caCertsPath}
+       }
+
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Fatalf("%s: %v", containerId, err)