X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/8e9a3e39375b4dd689cd85a92e77a5eee03b4908..8ed521f7fd1e48e1e415125745ed8c6627a62c91:/services/crunch-run/crunchrun.go?ds=inline diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index f510825673..0b59f7df91 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -41,7 +41,7 @@ var ErrCancelled = errors.New("Cancelled") // IKeepClient is the minimal Keep API methods used by crunch-run. type IKeepClient interface { PutHB(hash string, buf []byte) (string, int, error) - ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) + ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error) } // NewLogWriter is a factory function to create a new log writer. @@ -95,7 +95,6 @@ type ContainerRunner struct { SigChan chan os.Signal ArvMountExit chan error finalState string - trashLifetime time.Duration statLogger io.WriteCloser statReporter *crunchstat.Reporter @@ -240,8 +239,6 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( return c, nil } -var tmpBackedOutputDir = false - func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) { if runner.ArvMountPoint == "" { runner.ArvMountPoint, err = runner.MkTempDir("", prefix) @@ -319,7 +316,21 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { return fmt.Errorf("Can never write to a collection specified by portable data hash") } + idx := strings.Index(mnt.PortableDataHash, "/") + if idx > 0 { + mnt.Path = path.Clean(mnt.PortableDataHash[idx:]) + mnt.PortableDataHash = mnt.PortableDataHash[0:idx] + runner.Container.Mounts[bind] = mnt + } src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash) + if mnt.Path != "" && mnt.Path != "." { + if strings.HasPrefix(mnt.Path, "./") { + mnt.Path = mnt.Path[2:] + } else if strings.HasPrefix(mnt.Path, "/") { + mnt.Path = mnt.Path[1:] + } + src += "/" + mnt.Path + } } else { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) arvMountCmd = append(arvMountCmd, "--mount-tmp") @@ -353,7 +364,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir) runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind)) - tmpBackedOutputDir = true case mnt.Kind == "tmp": runner.Binds = append(runner.Binds, bind) @@ -673,34 +683,14 @@ func (runner *ContainerRunner) CaptureOutput() error { bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath) if bindSuffix == bind || len(bindSuffix) <= 0 { - // either doesn't start with OutputPath or is OutputPath itself + // either does not start with OutputPath or is OutputPath itself continue } - if strings.Index(bindSuffix, "/") != 0 { - return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind) - } - - jsondata, err := json.Marshal(mnt.Content) - if err != nil { - return fmt.Errorf("While marshal of mount content: %v", err) - } - var content map[string]interface{} - err = json.Unmarshal(jsondata, &content) - if err != nil { - return fmt.Errorf("While unmarshal of mount content: %v", err) - } - - if content["exclude_from_output"] == true { + if mnt.ExcludeFromOutput == true { continue } - idx := strings.Index(mnt.PortableDataHash, "/") - if idx > 0 { - mnt.Path = mnt.PortableDataHash[idx:] - mnt.PortableDataHash = mnt.PortableDataHash[0:idx] - } - // append to manifest_text m, err := runner.getCollectionManifestForPath(mnt, bindSuffix) if err != nil { @@ -712,10 +702,12 @@ func (runner *ContainerRunner) CaptureOutput() error { // Save output var response arvados.Collection + manifest := manifest.Manifest{Text: manifestText} + manifestText = manifest.Extract(".", ".").Text err = runner.ArvClient.Create("collections", arvadosclient.Dict{ "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "output for " + runner.Container.UUID, "manifest_text": manifestText}}, &response) @@ -726,86 +718,45 @@ func (runner *ContainerRunner) CaptureOutput() error { return nil } +var outputCollections = make(map[string]arvados.Collection) + // Fetch the collection for the mnt.PortableDataHash // Return the manifest_text fragment corresponding to the specified mnt.Path // after making any required updates. // Ex: -// If mnt.Path is not speficied, +// If mnt.Path is not specified, // return the entire manifest_text after replacing any "." with bindSuffix // If mnt.Path corresponds to one stream, -// return the manitest_text for that stream after replacing that stream name with bindSuffix -// Otherwise, check if a filename in any one stream is being sought. Return the manitest_text -// for that stream after replacing that stream name and file name using bindSuffix components. +// return the manifest_text for that stream after replacing that stream name with bindSuffix +// Otherwise, check if a filename in any one stream is being sought. Return the manifest_text +// for that stream after replacing stream name with bindSuffix minus the last word +// and the file name with last word of the bindSuffix +// Allowed path examples: +// "path":"/" +// "path":"/subdir1" +// "path":"/subdir1/subdir2" +// "path":"/subdir/filename" etc func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) { - var collection arvados.Collection - err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection) - if err != nil { - return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err) - } - - manifestText := "" - if mnt.Path == "" { - // no path specified; return the entire manifest text - manifestText = collection.ManifestText - manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1) - manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1) - } else { - // either a single stream or file from a stream is being sought - bindIdx := strings.LastIndex(bindSuffix, "/") - var bindSubdir, bindFileName string - if bindIdx >= 0 { - bindSubdir = bindSuffix[0:bindIdx] - bindFileName = bindSuffix[bindIdx+1:] - } - pathIdx := strings.LastIndex(mnt.Path, "/") - var pathSubdir, pathFileName string - if pathIdx >= 0 { - pathSubdir = mnt.Path[0:pathIdx] - pathFileName = mnt.Path[pathIdx+1:] - } - streams := strings.Split(collection.ManifestText, "\n") - for _, stream := range streams { - tokens := strings.Split(stream, " ") - if tokens[0] == "."+mnt.Path { - // path refers to this stream - adjustedStream := strings.Replace(stream, mnt.Path, bindSuffix, -1) - manifestText = adjustedStream + "\n" - break - } else { - // look for a matching file in this stream - if tokens[0] == "."+pathSubdir { - // path refers to a file in this stream - for _, token := range tokens { - if strings.Index(token, ":"+pathFileName) > 0 { - // found the file in the stream; discard all other file tokens - for _, t := range tokens { - if strings.Index(t, ":") == -1 { - manifestText += (" " + t) - } else { - break // done reading all non-file tokens - } - } - manifestText = strings.Trim(manifestText, " ") - token = strings.Replace(token, pathFileName, bindFileName, -1) - manifestText += (" " + token + "\n") - manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1) - break - } - } - } - } + collection := outputCollections[mnt.PortableDataHash] + if collection.PortableDataHash == "" { + err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection) + if err != nil { + return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err) } + outputCollections[mnt.PortableDataHash] = collection } - return manifestText, nil -} + if collection.ManifestText == "" { + runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash) + return "", nil + } -func (runner *ContainerRunner) loadDiscoveryVars() { - tl, err := runner.ArvClient.Discovery("defaultTrashLifetime") - if err != nil { - log.Fatalf("getting defaultTrashLifetime from discovery document: %s", err) + mft := manifest.Manifest{Text: collection.ManifestText} + extracted := mft.Extract(mnt.Path, bindSuffix) + if extracted.Err != nil { + return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error()) } - runner.trashLifetime = time.Duration(tl.(float64)) * time.Second + return extracted.Text, nil } func (runner *ContainerRunner) CleanupDirs() { @@ -860,7 +811,7 @@ func (runner *ContainerRunner) CommitLogs() error { err = runner.ArvClient.Create("collections", arvadosclient.Dict{ "collection": arvadosclient.Dict{ - "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339), + "is_trashed": true, "name": "logs for " + runner.Container.UUID, "manifest_text": mt}}, &response) @@ -1055,7 +1006,6 @@ func NewContainerRunner(api IArvadosClient, cr.Container.UUID = containerUUID cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run")) cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0) - cr.loadDiscoveryVars() return cr }