9397: NormalizeManifest
authorradhika <radhika@curoverse.com>
Mon, 6 Feb 2017 00:14:48 +0000 (19:14 -0500)
committerradhika <radhika@curoverse.com>
Mon, 6 Feb 2017 00:14:48 +0000 (19:14 -0500)
sdk/go/manifest/manifest.go
services/crunch-run/crunchrun.go

index 18a6b858ea24d4a34a05dddc5375803622e59f63..5219a5871f867176601810cd740d2c54e2ae2c91 100644 (file)
@@ -9,6 +9,7 @@ import (
        "fmt"
        "git.curoverse.com/arvados.git/sdk/go/blockdigest"
        "regexp"
+       "sort"
        "strconv"
        "strings"
 )
@@ -229,19 +230,75 @@ func parseManifestStream(s string) (m ManifestStream) {
        return
 }
 
-func (m *Manifest) ManifestStreamForPath(path string) string {
+func (m *Manifest) NormalizeManifest() map[string]ManifestStream {
+       streams := make(map[string]ManifestStream)
+
+       for stream := range m.StreamIter() {
+               ms := streams[stream.StreamName]
+
+               if ms.StreamName == "" { // new stream
+                       streams[stream.StreamName] = stream
+               } else {
+                       ms.Blocks = append(ms.Blocks, stream.Blocks...)
+                       ms.FileStreamSegments = append(ms.FileStreamSegments, stream.FileStreamSegments...)
+               }
+       }
+
+       return streams
+}
+
+func (m *Manifest) NormalizedManifestForPath(path string) string {
+       normalized := m.NormalizeManifest()
+
+       var streams []string
+       for _, stream := range normalized {
+               streams = append(streams, stream.StreamName)
+       }
+       sort.Strings(streams)
+
+       path = strings.Trim(path, "/")
+       var subdir, filename string
+
+       if path != "" {
+               if strings.Index(path, "/") == -1 {
+                       isStream := false
+                       for _, v := range streams {
+                               if v == "./"+path {
+                                       isStream = true
+                               }
+                       }
+                       if isStream {
+                               subdir = path
+                       } else {
+                               filename = path
+                       }
+               } else {
+                       pathIdx := strings.LastIndex(path, "/")
+                       if pathIdx >= 0 {
+                               subdir = path[0:pathIdx]
+                               filename = path[pathIdx+1:]
+                       }
+               }
+       }
+
        manifestForPath := ""
-       locators := []string{}
-       for ms := range m.StreamIter() {
-               if ms.StreamName != "./"+path {
+
+       for _, streamName := range streams {
+               stream := normalized[streamName]
+
+               if subdir != "" && stream.StreamName != "./"+subdir {
                        continue
                }
 
-               locators = append(locators, ms.Blocks...)
+               manifestForPath += stream.StreamName + " " + strings.Join(stream.Blocks, " ") + " "
 
                currentName := ""
                currentSpan := []uint64{0, 0}
-               for _, fss := range ms.FileStreamSegments {
+               for _, fss := range stream.FileStreamSegments {
+                       if filename != "" && fss.Name != filename {
+                               continue
+                       }
+
                        if fss.Name != currentName && currentName != "" {
                                manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + " "
                        }
@@ -262,14 +319,10 @@ func (m *Manifest) ManifestStreamForPath(path string) string {
                                }
                        }
                }
-               manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + " "
-       }
-
-       if len(locators) > 0 {
-               return "./" + path + " " + strings.Join(locators, " ") + " " + manifestForPath
+               manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + "\n"
        }
 
-       return ""
+       return manifestForPath
 }
 
 func (m *Manifest) StreamIter() <-chan ManifestStream {
@@ -308,45 +361,6 @@ func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
        return ch
 }
 
-func (m *Manifest) FileSegmentForPath(filepath string) string {
-       dir := "."
-       file := filepath
-       if idx := strings.LastIndex(filepath, "/"); idx >= 0 {
-               dir = "./" + filepath[0:idx]
-               file = filepath[idx+1:]
-       }
-
-       var fileSegments []FileSegment
-       for fs := range m.FileSegmentIterByName(filepath) {
-               fileSegments = append(fileSegments, *fs)
-       }
-
-       if len(fileSegments) == 0 {
-               return ""
-       }
-
-       locators := ""
-       manifestForFile := ""
-       currentSpan := []int{0, 0}
-       for _, fs := range fileSegments {
-               locators += fs.Locator + " "
-
-               if currentSpan[1] == 0 {
-                       currentSpan = []int{fs.Offset, fs.Len}
-               } else {
-                       if currentSpan[1] == fs.Offset {
-                               currentSpan[1] += fs.Len
-                       } else {
-                               manifestForFile += strconv.Itoa(currentSpan[0]) + ":" + strconv.Itoa(currentSpan[1]+fs.Len) + ":" + file + " "
-                               currentSpan = []int{fs.Offset, fs.Offset + fs.Len}
-                       }
-               }
-       }
-       manifestForFile += strconv.Itoa(currentSpan[0]) + ":" + strconv.Itoa(currentSpan[1]) + ":" + file
-
-       return fmt.Sprintf("%v %v %v", dir, strings.Trim(locators, " "), manifestForFile)
-}
-
 // Blocks may appear multiple times within the same manifest if they
 // are used by multiple files. In that case this Iterator will output
 // the same block multiple times.
index 5a228e646ad4ac34539780852ee457bb94bf6b34..40174b04c975bf6829e445e52d157ebf47d9423b 100644 (file)
@@ -743,10 +743,9 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
 
        manifest := manifest.Manifest{Text: collection.ManifestText}
 
-       manifestText := ""
+       manifestText := manifest.NormalizedManifestForPath(mnt.Path)
        if mnt.Path == "" || mnt.Path == "/" {
                // no path specified; return the entire manifest text after making adjustments
-               manifestText = collection.ManifestText
                manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
                manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
        } else {
@@ -768,19 +767,13 @@ func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, b
                        pathFileName = mntPath[pathIdx+1:]
                }
 
-               manifestStreamForPath := manifest.ManifestStreamForPath(mntPath[1:])
-               if manifestStreamForPath != "" {
+               if strings.Index(manifestText, "."+mntPath+" ") != -1 {
                        // path refers to this complete stream
-                       adjustedStream := strings.Replace(manifestStreamForPath, "."+mntPath, "."+bindSuffix, -1)
-                       manifestText = adjustedStream + "\n"
+                       manifestText = strings.Replace(manifestText, "."+mntPath, "."+bindSuffix, -1)
                } else {
                        // look for a matching file in this stream
-                       fs := manifest.FileSegmentForPath(mntPath[1:])
-                       if fs != "" {
-                               manifestText = strings.Replace(fs, ":"+pathFileName, ":"+bindFileName, -1)
-                               manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
-                               manifestText += "\n"
-                       }
+                       manifestText = strings.Replace(manifestText, ":"+pathFileName, ":"+bindFileName, -1)
+                       manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
                }
        }