11002: Merge branch 'master' into 11002-arvput-crash-fix
[arvados.git] / services / crunch-run / crunchrun.go
index f510825673a09848bbae996a44f8502776d8499f..0b59f7df91d12781b80659db7ea0be3c162a2703 100644 (file)
@@ -41,7 +41,7 @@ var ErrCancelled = errors.New("Cancelled")
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
        PutHB(hash string, buf []byte) (string, int, error)
-       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+       ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
 }
 
 // NewLogWriter is a factory function to create a new log writer.
@@ -95,7 +95,6 @@ type ContainerRunner struct {
        SigChan        chan os.Signal
        ArvMountExit   chan error
        finalState     string
-       trashLifetime  time.Duration
 
        statLogger   io.WriteCloser
        statReporter *crunchstat.Reporter
@@ -240,8 +239,6 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        return c, nil
 }
 
-var tmpBackedOutputDir = false
-
 func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
        if runner.ArvMountPoint == "" {
                runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
@@ -319,7 +316,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                                if mnt.Writable {
                                        return fmt.Errorf("Can never write to a collection specified by portable data hash")
                                }
+                               idx := strings.Index(mnt.PortableDataHash, "/")
+                               if idx > 0 {
+                                       mnt.Path = path.Clean(mnt.PortableDataHash[idx:])
+                                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
+                                       runner.Container.Mounts[bind] = mnt
+                               }
                                src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
+                               if mnt.Path != "" && mnt.Path != "." {
+                                       if strings.HasPrefix(mnt.Path, "./") {
+                                               mnt.Path = mnt.Path[2:]
+                                       } else if strings.HasPrefix(mnt.Path, "/") {
+                                               mnt.Path = mnt.Path[1:]
+                                       }
+                                       src += "/" + mnt.Path
+                               }
                        } else {
                                src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
                                arvMountCmd = append(arvMountCmd, "--mount-tmp")
@@ -353,7 +364,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) {
                        }
                        runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
                        runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-                       tmpBackedOutputDir = true
 
                case mnt.Kind == "tmp":
                        runner.Binds = append(runner.Binds, bind)
@@ -673,34 +683,14 @@ func (runner *ContainerRunner) CaptureOutput() error {
                bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
 
                if bindSuffix == bind || len(bindSuffix) <= 0 {
-                       // either doesn't start with OutputPath or is OutputPath itself
+                       // either does not start with OutputPath or is OutputPath itself
                        continue
                }
 
-               if strings.Index(bindSuffix, "/") != 0 {
-                       return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind)
-               }
-
-               jsondata, err := json.Marshal(mnt.Content)
-               if err != nil {
-                       return fmt.Errorf("While marshal of mount content: %v", err)
-               }
-               var content map[string]interface{}
-               err = json.Unmarshal(jsondata, &content)
-               if err != nil {
-                       return fmt.Errorf("While unmarshal of mount content: %v", err)
-               }
-
-               if content["exclude_from_output"] == true {
+               if mnt.ExcludeFromOutput == true {
                        continue
                }
 
-               idx := strings.Index(mnt.PortableDataHash, "/")
-               if idx > 0 {
-                       mnt.Path = mnt.PortableDataHash[idx:]
-                       mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
-               }
-
                // append to manifest_text
                m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
                if err != nil {
@@ -712,10 +702,12 @@ func (runner *ContainerRunner) CaptureOutput() error {
 
        // Save output
        var response arvados.Collection
+       manifest := manifest.Manifest{Text: manifestText}
+       manifestText = manifest.Extract(".", ".").Text
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
-                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "is_trashed":    true,
                                "name":          "output for " + runner.Container.UUID,
                                "manifest_text": manifestText}},
                &response)
@@ -726,86 +718,45 @@ func (runner *ContainerRunner) CaptureOutput() error {
        return nil
 }
 
+var outputCollections = make(map[string]arvados.Collection)
+
 // Fetch the collection for the mnt.PortableDataHash
 // Return the manifest_text fragment corresponding to the specified mnt.Path
 //  after making any required updates.
 //  Ex:
-//    If mnt.Path is not speficied,
+//    If mnt.Path is not specified,
 //      return the entire manifest_text after replacing any "." with bindSuffix
 //    If mnt.Path corresponds to one stream,
-//      return the manitest_text for that stream after replacing that stream name with bindSuffix
-//    Otherwise, check if a filename in any one stream is being sought. Return the manitest_text
-//      for that stream after replacing that stream name and file name using bindSuffix components.
+//      return the manifest_text for that stream after replacing that stream name with bindSuffix
+//    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
+//      for that stream after replacing stream name with bindSuffix minus the last word
+//      and the file name with last word of the bindSuffix
+//  Allowed path examples:
+//    "path":"/"
+//    "path":"/subdir1"
+//    "path":"/subdir1/subdir2"
+//    "path":"/subdir/filename" etc
 func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
-       var collection arvados.Collection
-       err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
-       if err != nil {
-               return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
-       }
-
-       manifestText := ""
-       if mnt.Path == "" {
-               // no path specified; return the entire manifest text
-               manifestText = collection.ManifestText
-               manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
-               manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
-       } else {
-               // either a single stream or file from a stream is being sought
-               bindIdx := strings.LastIndex(bindSuffix, "/")
-               var bindSubdir, bindFileName string
-               if bindIdx >= 0 {
-                       bindSubdir = bindSuffix[0:bindIdx]
-                       bindFileName = bindSuffix[bindIdx+1:]
-               }
-               pathIdx := strings.LastIndex(mnt.Path, "/")
-               var pathSubdir, pathFileName string
-               if pathIdx >= 0 {
-                       pathSubdir = mnt.Path[0:pathIdx]
-                       pathFileName = mnt.Path[pathIdx+1:]
-               }
-               streams := strings.Split(collection.ManifestText, "\n")
-               for _, stream := range streams {
-                       tokens := strings.Split(stream, " ")
-                       if tokens[0] == "."+mnt.Path {
-                               // path refers to this stream
-                               adjustedStream := strings.Replace(stream, mnt.Path, bindSuffix, -1)
-                               manifestText = adjustedStream + "\n"
-                               break
-                       } else {
-                               // look for a matching file in this stream
-                               if tokens[0] == "."+pathSubdir {
-                                       // path refers to a file in this stream
-                                       for _, token := range tokens {
-                                               if strings.Index(token, ":"+pathFileName) > 0 {
-                                                       // found the file in the stream; discard all other file tokens
-                                                       for _, t := range tokens {
-                                                               if strings.Index(t, ":") == -1 {
-                                                                       manifestText += (" " + t)
-                                                               } else {
-                                                                       break // done reading all non-file tokens
-                                                               }
-                                                       }
-                                                       manifestText = strings.Trim(manifestText, " ")
-                                                       token = strings.Replace(token, pathFileName, bindFileName, -1)
-                                                       manifestText += (" " + token + "\n")
-                                                       manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
-                                                       break
-                                               }
-                                       }
-                               }
-                       }
+       collection := outputCollections[mnt.PortableDataHash]
+       if collection.PortableDataHash == "" {
+               err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+               if err != nil {
+                       return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
                }
+               outputCollections[mnt.PortableDataHash] = collection
        }
 
-       return manifestText, nil
-}
+       if collection.ManifestText == "" {
+               runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
+               return "", nil
+       }
 
-func (runner *ContainerRunner) loadDiscoveryVars() {
-       tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
-       if err != nil {
-               log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err)
+       mft := manifest.Manifest{Text: collection.ManifestText}
+       extracted := mft.Extract(mnt.Path, bindSuffix)
+       if extracted.Err != nil {
+               return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
        }
-       runner.trashLifetime = time.Duration(tl.(float64)) * time.Second
+       return extracted.Text, nil
 }
 
 func (runner *ContainerRunner) CleanupDirs() {
@@ -860,7 +811,7 @@ func (runner *ContainerRunner) CommitLogs() error {
        err = runner.ArvClient.Create("collections",
                arvadosclient.Dict{
                        "collection": arvadosclient.Dict{
-                               "trash_at":      time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+                               "is_trashed":    true,
                                "name":          "logs for " + runner.Container.UUID,
                                "manifest_text": mt}},
                &response)
@@ -1055,7 +1006,6 @@ func NewContainerRunner(api IArvadosClient,
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
-       cr.loadDiscoveryVars()
        return cr
 }