"fmt"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
"regexp"
+ "sort"
"strconv"
"strings"
)
var ErrInvalidToken = errors.New("Invalid token")
-var LocatorPattern = regexp.MustCompile(
- "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
-
type Manifest struct {
Text string
Err error
Hints []string
}
-type DataSegment struct {
- BlockLocator
- Locator string
- StreamOffset uint64
-}
-
// FileSegment is a portion of a file that is contained within a
// single block.
type FileSegment struct {
type ManifestStream struct {
StreamName string
Blocks []string
+ BlockOffsets []uint64
FileStreamSegments []FileStreamSegment
Err error
}
+// Array of segments referencing file content
+type SegmentedFile []FileSegment
+
+// Map of files to list of file segments referencing file content
+type SegmentedStream map[string]SegmentedFile
+
+// Map of streams
+type SegmentedManifest map[string]SegmentedStream
+
var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
func unescapeSeq(seq string) string {
return string([]byte{byte(i)})
}
+func EscapeName(s string) string {
+ return strings.Replace(s, " ", `\040`, -1)
+}
+
func UnescapeName(s string) string {
return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
}
func ParseBlockLocator(s string) (b BlockLocator, err error) {
- if !LocatorPattern.MatchString(s) {
+ if !blockdigest.LocatorPattern.MatchString(s) {
err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
"\"%s\".",
s,
- LocatorPattern.String())
+ blockdigest.LocatorPattern.String())
} else {
tokens := strings.Split(s, "+")
var blockSize int64
}
func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
- ch := make(chan *FileSegment)
+ ch := make(chan *FileSegment, 64)
go func() {
s.sendFileSegmentIterByName(filepath, ch)
close(ch)
return ch
}
+func FirstBlock(offsets []uint64, range_start uint64) int {
+ // range_start/block_start is the inclusive lower bound
+ // range_end/block_end is the exclusive upper bound
+
+ hi := len(offsets) - 1
+ var lo int
+ i := ((hi + lo) / 2)
+ block_start := offsets[i]
+ block_end := offsets[i+1]
+
+ // perform a binary search for the first block
+ // assumes that all of the blocks are contiguous, so range_start is guaranteed
+ // to either fall into the range of a block or be outside the block range entirely
+ for !(range_start >= block_start && range_start < block_end) {
+ if lo == i {
+ // must be out of range, fail
+ return -1
+ }
+ if range_start > block_start {
+ lo = i
+ } else {
+ hi = i
+ i = ((hi + lo) / 2)
+ block_start = offsets[i]
+ block_end = offsets[i+1]
+ }
+ }
+ return i
+}
+
func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
- blockLens := make([]int, 0, len(s.Blocks))
// This is what streamName+"/"+fileName will look like:
- target := "./" + filepath
+ target := filepath
+ if !strings.HasPrefix(target, "./") {
+ target = "./" + target
+ }
for _, fTok := range s.FileStreamSegments {
wantPos := fTok.SegPos
wantLen := fTok.SegLen
ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
continue
}
- // Linear search for blocks containing data for this
- // file
- var blockPos uint64 = 0 // position of block in stream
- for i, loc := range s.Blocks {
- if blockPos >= wantPos+wantLen {
+
+ // Binary search to determine first block in the stream
+ i := FirstBlock(s.BlockOffsets, wantPos)
+ if i == -1 {
+ // error
+ break
+ }
+ for i < len(s.Blocks) {
+ blockPos := s.BlockOffsets[i]
+ blockEnd := s.BlockOffsets[i+1]
+ if blockEnd <= wantPos {
+ // current block comes before current file span
+ // (shouldn't happen, FirstBlock() should start us
+ // on the right block)
break
}
- if len(blockLens) <= i {
- blockLens = blockLens[:i+1]
- b, err := ParseBlockLocator(loc)
- if err != nil {
- // Unparseable locator -> unusable
- // stream.
- ch <- nil
- return
- }
- blockLens[i] = b.Size
- }
- blockLen := uint64(blockLens[i])
- if blockPos+blockLen <= wantPos {
- blockPos += blockLen
- continue
+ if blockPos >= wantPos+wantLen {
+ // current block comes after current file span
+ break
}
+
fseg := FileSegment{
- Locator: loc,
+ Locator: s.Blocks[i],
Offset: 0,
- Len: blockLens[i],
+ Len: int(blockEnd - blockPos),
}
if blockPos < wantPos {
fseg.Offset = int(wantPos - blockPos)
fseg.Len -= fseg.Offset
}
- if blockPos+blockLen > wantPos+wantLen {
+ if blockEnd > wantPos+wantLen {
fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
}
ch <- &fseg
- blockPos += blockLen
+ i += 1
}
}
}
return
}
+ m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
+ var streamoffset uint64
+ for i, b := range m.Blocks {
+ bl, err := ParseBlockLocator(b)
+ if err != nil {
+ m.Err = err
+ return
+ }
+ m.BlockOffsets[i] = streamoffset
+ streamoffset += uint64(bl.Size)
+ }
+ m.BlockOffsets[len(m.Blocks)] = streamoffset
+
if len(fileTokens) == 0 {
m.Err = fmt.Errorf("No file tokens found")
return
return
}
+func SplitPath(path string) (streamname, filename string) {
+ pathIdx := strings.LastIndex(path, "/")
+ if pathIdx >= 0 {
+ streamname = path[0:pathIdx]
+ filename = path[pathIdx+1:]
+ } else {
+ streamname = path
+ filename = ""
+ }
+ return
+}
+
+func (m *Manifest) SegmentManifest() *SegmentedManifest {
+ files := make(SegmentedManifest)
+
+ for stream := range m.StreamIter() {
+ currentStreamfiles := make(map[string]bool)
+ for _, f := range stream.FileStreamSegments {
+ sn := stream.StreamName
+ if sn != "." && !strings.HasPrefix(sn, "./") {
+ sn = "./" + sn
+ }
+ if strings.HasSuffix(sn, "/") {
+ sn = sn[0 : len(sn)-1]
+ }
+ path := sn + "/" + f.Name
+ streamname, filename := SplitPath(path)
+ if files[streamname] == nil {
+ files[streamname] = make(SegmentedStream)
+ }
+ if !currentStreamfiles[path] {
+ segs := files[streamname][filename]
+ for seg := range stream.FileSegmentIterByName(path) {
+ segs = append(segs, *seg)
+ }
+ files[streamname][filename] = segs
+ currentStreamfiles[path] = true
+ }
+ }
+ }
+
+ return &files
+}
+
+func (stream *SegmentedStream) NormalizeStream(name string) string {
+ var sortedfiles []string
+ for k, _ := range *stream {
+ sortedfiles = append(sortedfiles, k)
+ }
+ sort.Strings(sortedfiles)
+
+ stream_tokens := []string{EscapeName(name)}
+
+ blocks := make(map[string]int64)
+ var streamoffset int64
+
+ // Go through each file and add each referenced block exactly once.
+ for _, streamfile := range sortedfiles {
+ for _, segment := range (*stream)[streamfile] {
+ if _, ok := blocks[segment.Locator]; !ok {
+ stream_tokens = append(stream_tokens, segment.Locator)
+ blocks[segment.Locator] = streamoffset
+ b, _ := ParseBlockLocator(segment.Locator)
+ streamoffset += int64(b.Size)
+ }
+ }
+ }
+
+ if len(stream_tokens) == 1 {
+ stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
+ }
+
+ for _, streamfile := range sortedfiles {
+ // Add in file segments
+ span_start := int64(-1)
+ span_end := int64(0)
+ fout := EscapeName(streamfile)
+ for _, segment := range (*stream)[streamfile] {
+ // Collapse adjacent segments
+ streamoffset = blocks[segment.Locator] + int64(segment.Offset)
+ if span_start == -1 {
+ span_start = streamoffset
+ span_end = streamoffset + int64(segment.Len)
+ } else {
+ if streamoffset == span_end {
+ span_end += int64(segment.Len)
+ } else {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
+ span_start = streamoffset
+ span_end = streamoffset + int64(segment.Len)
+ }
+ }
+ }
+
+ if span_start != -1 {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
+ }
+
+ if len((*stream)[streamfile]) == 0 {
+ stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
+ }
+ }
+
+ return strings.Join(stream_tokens, " ") + "\n"
+}
+
+func (m *Manifest) NormalizeManifest() string {
+ segments := m.SegmentManifest()
+
+ var sortedstreams []string
+ for k, _ := range *segments {
+ sortedstreams = append(sortedstreams, k)
+ }
+ sort.Strings(sortedstreams)
+
+ var manifest string
+ for _, k := range sortedstreams {
+ stream := (*segments)[k]
+ manifest += stream.NormalizeStream(k)
+ }
+ return manifest
+}
+
+func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
+ if path == "" {
+ path = "."
+ }
+ if relocate == "" {
+ relocate = "."
+ }
+
+ streamname, filename := SplitPath(path)
+ var relocate_stream, relocate_filename string
+ relocate_stream, relocate_filename = SplitPath(relocate)
+
+ if stream, ok := (*m)[path]; ok {
+ // refers to a single stream
+ return stream.NormalizeStream(relocate)
+ } else if stream, ok := (*m)[streamname]; ok {
+ // refers to a single file in a stream
+ newstream := make(SegmentedStream)
+ if relocate_filename == "" {
+ relocate_filename = filename
+ }
+ newstream[relocate_filename] = stream[filename]
+ return newstream.NormalizeStream(relocate_stream)
+ } else {
+ // refers to multiple streams
+ manifest := ""
+ prefix := path
+ if !strings.HasSuffix(prefix, "/") {
+ prefix += "/"
+ }
+ if !strings.HasSuffix(relocate, "/") {
+ relocate += "/"
+ }
+
+ var sortedstreams []string
+ for k, _ := range *m {
+ sortedstreams = append(sortedstreams, k)
+ }
+ sort.Strings(sortedstreams)
+
+ for _, k := range sortedstreams {
+ if strings.HasPrefix(k, prefix) {
+ v := (*m)[k]
+ manifest += v.NormalizeStream(relocate + k[len(prefix):])
+ }
+ }
+ return manifest
+ }
+}
+
+func (m *Manifest) ManifestForPath(path, relocate string) string {
+ return m.SegmentManifest().ManifestForPath(path, relocate)
+}
+
func (m *Manifest) StreamIter() <-chan ManifestStream {
ch := make(chan ManifestStream)
go func(input string) {
}
func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
- ch := make(chan *FileSegment)
+ ch := make(chan *FileSegment, 64)
+ if !strings.HasPrefix(filepath, "./") {
+ filepath = "./" + filepath
+ }
go func() {
for stream := range m.StreamIter() {
- if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
+ if !strings.HasPrefix(filepath, stream.StreamName+"/") {
continue
}
stream.sendFileSegmentIterByName(filepath, ch)
}
}
}
+
+func TestNormalizeManifest(t *testing.T) {
+ m1 := Manifest{Text: `. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
+`}
+ expectEqual(t, m1.NormalizeManifest(),
+ `. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 0:127:md5sum.txt
+`)
+
+ m2 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 b9677abbac956bd3e86b1deb28dfac03+67108864 fc15aff2a762b13f521baf042140acec+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:227212247:var-GS000016015-ASM.tsv.bz2
+`}
+ expectEqual(t, m2.NormalizeManifest(), m2.Text)
+
+ m3 := Manifest{Text: `. 5348b82a029fd9e971a811ce1f71360b+43 3:40:md5sum.txt
+. 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
+. 8b22da26f9f433dea0a10e5ec66d73ba+43 0:43:md5sum.txt
+`}
+ expectEqual(t, m3.NormalizeManifest(), `. 5348b82a029fd9e971a811ce1f71360b+43 085c37f02916da1cad16f93c54d899b7+41 8b22da26f9f433dea0a10e5ec66d73ba+43 3:124:md5sum.txt
+`)
+
+ m4 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar
+`}
+
+ expectEqual(t, m4.NormalizeManifest(),
+ `./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ expectEqual(t, m4.ManifestForPath("./foo", "."), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo", "./baz"), "./baz 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "."), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "./baz"), ". 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:baz 67108864:3:baz\n")
+ expectEqual(t, m4.ManifestForPath("./foo/bar", "./quux/"), "./quux 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar\n")
+ expectEqual(t, m4.ManifestForPath(".", "."), `./foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+ expectEqual(t, m4.ManifestForPath(".", "./zip"), `./zip/foo 204e43b8a1185621ca55a94839582e6f+67108864 323d2a3ce20370c4ca1d3462a344f8fd+25885655 0:3:bar 67108864:3:bar
+./zip/zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ m5 := Manifest{Text: `. 204e43b8a1185621ca55a94839582e6f+67108864 0:3:foo/bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+./foo 204e43b8a1185621ca55a94839582e6f+67108864 3:3:bar
+`}
+ expectEqual(t, m5.NormalizeManifest(),
+ `./foo 204e43b8a1185621ca55a94839582e6f+67108864 0:6:bar
+./zzz 204e43b8a1185621ca55a94839582e6f+67108864 0:999:zzz
+`)
+
+ m8 := Manifest{Text: `./a\040b\040c 59ca0efa9f5633cb0371bbc0355478d8+13 0:13:hello\040world.txt
+`}
+ expectEqual(t, m8.NormalizeManifest(), m8.Text)
+
+ m9 := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
+ expectEqual(t, m9.ManifestForPath("", ""), ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:20:one 20:20:two\n")
+
+ m10 := Manifest{Text: ". acbd18db4cc2f85cedef654fccc4a4d8+40 0:10:one 20:10:two 10:10:one 30:10:two\n"}
+ expectEqual(t, m10.ManifestForPath("./two", "./three"), ". acbd18db4cc2f85cedef654fccc4a4d8+40 20:20:three\n")
+
+ m11 := Manifest{Text: arvadostest.PathologicalManifest}
+ expectEqual(t, m11.NormalizeManifest(), `. acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 73feffa4b7f6bb68e44cf984c85f6e88+3+Z+K@xyzzy d41d8cd98f00b204e9800998ecf8427e+0 0:1:f 1:4:ooba 5:1:r 5:4:rbaz 9:0:zero@0 9:0:zero@1 9:0:zero@4 9:0:zero@9
+./foo acbd18db4cc2f85cedef654fccc4a4d8+3 d41d8cd98f00b204e9800998ecf8427e+0 0:3:foo 0:3:foo 3:0:zero
+./foo\040bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:baz 0:3:baz\040waz
+./overlapReverse acbd18db4cc2f85cedef654fccc4a4d8+3 2:1:o 2:1:ofoo 0:3:ofoo 1:2:oo
+./segmented acbd18db4cc2f85cedef654fccc4a4d8+3 37b51d194a7513e45b56f6524f2d51f2+3 d41d8cd98f00b204e9800998ecf8427e+0 0:1:frob 5:1:frob 1:1:frob 6:0:frob 3:1:frob 1:2:oof 0:1:oof
+`)
+
+}