X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8adad21150d3678aad0f88e5fb30a088145478ee..8ed521f7fd1e48e1e415125745ed8c6627a62c91:/services/crunch-run/crunchrun.go diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 40174b04c9..0b59f7df91 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -95,7 +95,6 @@ type ContainerRunner struct { SigChan chan os.Signal ArvMountExit chan error finalState string - trashLifetime time.Duration statLogger io.WriteCloser statReporter *crunchstat.Reporter @@ -317,7 +316,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { return fmt.Errorf("Can never write to a collection specified by portable data hash") } + idx := strings.Index(mnt.PortableDataHash, "/") + if idx > 0 { + mnt.Path = path.Clean(mnt.PortableDataHash[idx:]) + mnt.PortableDataHash = mnt.PortableDataHash[0:idx] + runner.Container.Mounts[bind] = mnt + } src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash) + if mnt.Path != "" && mnt.Path != "." { + if strings.HasPrefix(mnt.Path, "./") { + mnt.Path = mnt.Path[2:] + } else if strings.HasPrefix(mnt.Path, "/") { + mnt.Path = mnt.Path[1:] + } + src += "/" + mnt.Path + } } else { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") @@ -674,20 +687,10 @@ func (runner *ContainerRunner) CaptureOutput() error { continue } - if strings.HasPrefix(bindSuffix, "/") == false { - bindSuffix = "/" + bindSuffix - } - if mnt.ExcludeFromOutput == true { continue } - idx := strings.Index(mnt.PortableDataHash, "/") - if idx > 0 { - mnt.Path = mnt.PortableDataHash[idx:] - mnt.PortableDataHash = mnt.PortableDataHash[0:idx] - } - // append to manifest_text m, err := runner.getCollectionManifestForPath(mnt, bindSuffix) if err != nil { @@ -699,10 +702,12 @@ func (runner *ContainerRunner) CaptureOutput() error { // Save output var response arvados.Collection + manifest := manifest.Manifest{Text: manifestText} + manifestText = manifest.Extract(".", ".").Text err = runner.ArvClient.Create("collections", arvadosclient.Dict{ "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "output for " + runner.Container.UUID, "manifest_text": manifestText}}, &response) @@ -741,55 +746,17 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b outputCollections[mnt.PortableDataHash] = collection } - manifest := manifest.Manifest{Text: collection.ManifestText} - - manifestText := manifest.NormalizedManifestForPath(mnt.Path) - if mnt.Path == "" || mnt.Path == "/" { - // no path specified; return the entire manifest text after making adjustments - manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1) - manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1) - } else { - // either a single stream or file from a stream is being sought - bindIdx := strings.LastIndex(bindSuffix, "/") - var bindSubdir, bindFileName string - if bindIdx >= 0 { - bindSubdir = "." + bindSuffix[0:bindIdx] - bindFileName = bindSuffix[bindIdx+1:] - } - mntPath := mnt.Path - if strings.HasSuffix(mntPath, "/") { - mntPath = mntPath[0 : len(mntPath)-1] - } - pathIdx := strings.LastIndex(mntPath, "/") - var pathSubdir, pathFileName string - if pathIdx >= 0 { - pathSubdir = "." + mntPath[0:pathIdx] - pathFileName = mntPath[pathIdx+1:] - } - - if strings.Index(manifestText, "."+mntPath+" ") != -1 { - // path refers to this complete stream - manifestText = strings.Replace(manifestText, "."+mntPath, "."+bindSuffix, -1) - } else { - // look for a matching file in this stream - manifestText = strings.Replace(manifestText, ":"+pathFileName, ":"+bindFileName, -1) - manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1) - } + if collection.ManifestText == "" { + runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash) + return "", nil } - if manifestText == "" { - runner.CrunchLog.Printf("No manifest segment found for bind '%v' with path '%v'", bindSuffix, mnt.Path) - } - - return manifestText, nil -} - -func (runner *ContainerRunner) loadDiscoveryVars() { - tl, err := runner.ArvClient.Discovery("defaultTrashLifetime") - if err != nil { - log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err) + mft := manifest.Manifest{Text: collection.ManifestText} + extracted := mft.Extract(mnt.Path, bindSuffix) + if extracted.Err != nil { + return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error()) } - runner.trashLifetime = time.Duration(tl.(float64)) * time.Second + return extracted.Text, nil } func (runner *ContainerRunner) CleanupDirs() { @@ -844,7 +811,7 @@ func (runner *ContainerRunner) CommitLogs() error { err = runner.ArvClient.Create("collections", arvadosclient.Dict{ "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "logs for " + runner.Container.UUID, "manifest_text": mt}}, &response) @@ -1039,7 +1006,6 @@ func NewContainerRunner(api IArvadosClient, cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) - cr.loadDiscoveryVars() return cr }