// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
- ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
}
// NewLogWriter is a factory function to create a new log writer.
var tmpBackedOutputDir = false
+func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
+ if runner.ArvMountPoint == "" {
+ runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+ }
+ return
+}
+
func (runner *ContainerRunner) SetupMounts() (err error) {
- runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+ err = runner.SetupArvMountPoint("keep")
if err != nil {
return fmt.Errorf("While creating keep mount temp dir: %v", err)
}
manifestText = rec.ManifestText
}
+ // Pre-populate output from the configured mount points
+ var binds []string
+ for bind, _ := range runner.Container.Mounts {
+ binds = append(binds, bind)
+ }
+ sort.Strings(binds)
+
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+
+ bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
+
+ if bindSuffix == bind || len(bindSuffix) <= 0 {
+ // either does not start with OutputPath or is OutputPath itself
+ continue
+ }
+
+ if strings.Index(bindSuffix, "/") != 0 {
+ return fmt.Errorf("Expected bind to be of the format '%v/*' but found: %v", runner.Container.OutputPath, bind)
+ }
+
+ jsondata, err := json.Marshal(mnt.Content)
+ if err != nil {
+ return fmt.Errorf("While marshal of mount content: %v", err)
+ }
+ var content map[string]interface{}
+ err = json.Unmarshal(jsondata, &content)
+ if err != nil {
+ return fmt.Errorf("While unmarshal of mount content: %v", err)
+ }
+
+ if content["exclude_from_output"] == true {
+ continue
+ }
+
+ idx := strings.Index(mnt.PortableDataHash, "/")
+ if idx > 0 {
+ mnt.Path = mnt.PortableDataHash[idx:]
+ mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
+ }
+
+ // append to manifest_text
+ m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
+ if err != nil {
+ return err
+ }
+
+ manifestText = manifestText + m
+ }
+
+ // Save output
var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
return nil
}
+// Fetch the collection for the mnt.PortableDataHash
+// Return the manifest_text fragment corresponding to the specified mnt.Path
+// after making any required updates.
+// Ex:
+// If mnt.Path is not specified,
+// return the entire manifest_text after replacing any "." with bindSuffix
+// If mnt.Path corresponds to one stream,
+// return the manifest_text for that stream after replacing that stream name with bindSuffix
+// Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
+// for that stream after replacing stream name with bindSuffix minus the last word
+// and the file name with last word of the bindSuffix
+// Allowed path examples:
+// "path":"/"
+// "path":"/subdir1"
+// "path":"/subdir1/subdir2"
+// "path":"/subdir/filename" etc
+func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
+ var collection arvados.Collection
+ err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+ if err != nil {
+ return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
+ }
+
+ manifestText := ""
+ if mnt.Path == "" || mnt.Path == "/" {
+ // no path specified; return the entire manifest text
+ manifestText = collection.ManifestText
+ manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
+ manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
+ } else {
+ // either a single stream or file from a stream is being sought
+ bindIdx := strings.LastIndex(bindSuffix, "/")
+ var bindSubdir, bindFileName string
+ if bindIdx >= 0 {
+ bindSubdir = "." + bindSuffix[0:bindIdx]
+ bindFileName = bindSuffix[bindIdx+1:]
+ }
+ mntPath := mnt.Path
+ if strings.HasSuffix(mntPath, "/") {
+ mntPath = mntPath[0 : len(mntPath)-1]
+ }
+ pathIdx := strings.LastIndex(mntPath, "/")
+ var pathSubdir, pathFileName string
+ if pathIdx >= 0 {
+ pathSubdir = "." + mntPath[0:pathIdx]
+ pathFileName = mntPath[pathIdx+1:]
+ }
+ streams := strings.Split(collection.ManifestText, "\n")
+ for _, stream := range streams {
+ tokens := strings.Split(stream, " ")
+ if tokens[0] == "."+mntPath {
+ // path refers to this complete stream
+ adjustedStream := strings.Replace(stream, "."+mntPath, "."+bindSuffix, -1)
+ manifestText = adjustedStream + "\n"
+ break
+ } else {
+ // look for a matching file in this stream
+ if tokens[0] == pathSubdir {
+ // path refers to a file in this stream
+ for _, token := range tokens {
+ if strings.Index(token, ":"+pathFileName) > 0 {
+ // found the file in the stream; discard all other file tokens
+ for _, t := range tokens {
+ if strings.Index(t, ":") == -1 {
+ manifestText += (" " + t)
+ } else {
+ break // done reading all non-file tokens
+ }
+ }
+ manifestText = strings.Trim(manifestText, " ")
+ token = strings.Replace(token, ":"+pathFileName, ":"+bindFileName, -1)
+ manifestText += (" " + token + "\n")
+ manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
+ break
+ }
+ }
+ }
+ }
+ }
+ }
+
+ return manifestText, nil
+}
+
func (runner *ContainerRunner) loadDiscoveryVars() {
tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
if err != nil {