1 /* Deals with parsing Manifest Text. */
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
10 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
17 var ErrInvalidToken = errors.New("Invalid token")
19 type Manifest struct {
24 type BlockLocator struct {
25 Digest blockdigest.BlockDigest
30 // FileSegment is a portion of a file that is contained within a
32 type FileSegment struct {
34 // Offset (within this block) of this data segment
39 // FileStreamSegment is a portion of a file described as a segment of a stream.
40 type FileStreamSegment struct {
46 // Represents a single line from a manifest.
47 type ManifestStream struct {
51 FileStreamSegments []FileStreamSegment
55 // Array of segments referencing file content
56 type SegmentedFile []FileSegment
58 // Map of files to list of file segments referencing file content
59 type SegmentedStream map[string]SegmentedFile
62 type SegmentedManifest map[string]SegmentedStream
64 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
66 func unescapeSeq(seq string) string {
70 i, err := strconv.ParseUint(seq[1:], 8, 8)
72 // Invalid escape sequence: can't unescape.
75 return string([]byte{byte(i)})
78 func EscapeName(s string) string {
79 return strings.Replace(s, " ", `\040`, -1)
82 func UnescapeName(s string) string {
83 return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
86 func ParseBlockLocator(s string) (b BlockLocator, err error) {
87 if !blockdigest.LocatorPattern.MatchString(s) {
88 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
91 blockdigest.LocatorPattern.String())
93 tokens := strings.Split(s, "+")
95 var blockDigest blockdigest.BlockDigest
96 // We expect both of the following to succeed since LocatorPattern
97 // restricts the strings appropriately.
98 blockDigest, err = blockdigest.FromString(tokens[0])
102 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
106 b.Digest = blockDigest
107 b.Size = int(blockSize)
113 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
114 parts := strings.SplitN(tok, ":", 3)
116 err = ErrInvalidToken
119 ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
123 ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
127 ft.Name = UnescapeName(parts[2])
131 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
132 ch := make(chan *FileSegment, 64)
134 s.sendFileSegmentIterByName(filepath, ch)
140 func FirstBlock(offsets []uint64, range_start uint64) int {
141 // range_start/block_start is the inclusive lower bound
142 // range_end/block_end is the exclusive upper bound
144 hi := len(offsets) - 1
147 block_start := offsets[i]
148 block_end := offsets[i+1]
150 // perform a binary search for the first block
151 // assumes that all of the blocks are contiguous, so range_start is guaranteed
152 // to either fall into the range of a block or be outside the block range entirely
153 for !(range_start >= block_start && range_start < block_end) {
155 // must be out of range, fail
158 if range_start > block_start {
163 block_start = offsets[i]
164 block_end = offsets[i+1]
170 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
171 // This is what streamName+"/"+fileName will look like:
173 if !strings.HasPrefix(target, "./") {
174 target = "./" + target
176 for _, fTok := range s.FileStreamSegments {
177 wantPos := fTok.SegPos
178 wantLen := fTok.SegLen
181 if s.StreamName+"/"+name != target {
185 ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
189 // Binary search to determine first block in the stream
190 i := FirstBlock(s.BlockOffsets, wantPos)
195 for i < len(s.Blocks) {
196 blockPos := s.BlockOffsets[i]
197 blockEnd := s.BlockOffsets[i+1]
198 if blockEnd <= wantPos {
199 // current block comes before current file span
200 // (shouldn't happen, FirstBlock() should start us
201 // on the right block)
204 if blockPos >= wantPos+wantLen {
205 // current block comes after current file span
210 Locator: s.Blocks[i],
212 Len: int(blockEnd - blockPos),
214 if blockPos < wantPos {
215 fseg.Offset = int(wantPos - blockPos)
216 fseg.Len -= fseg.Offset
218 if blockEnd > wantPos+wantLen {
219 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
227 func parseManifestStream(s string) (m ManifestStream) {
228 tokens := strings.Split(s, " ")
230 m.StreamName = UnescapeName(tokens[0])
231 if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
232 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
238 for i = 0; i < len(tokens); i++ {
239 if !blockdigest.IsBlockLocator(tokens[i]) {
243 m.Blocks = tokens[:i]
244 fileTokens := tokens[i:]
246 if len(m.Blocks) == 0 {
247 m.Err = fmt.Errorf("No block locators found")
251 m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
252 var streamoffset uint64
253 for i, b := range m.Blocks {
254 bl, err := ParseBlockLocator(b)
259 m.BlockOffsets[i] = streamoffset
260 streamoffset += uint64(bl.Size)
262 m.BlockOffsets[len(m.Blocks)] = streamoffset
264 if len(fileTokens) == 0 {
265 m.Err = fmt.Errorf("No file tokens found")
269 for _, ft := range fileTokens {
270 pft, err := parseFileStreamSegment(ft)
272 m.Err = fmt.Errorf("Invalid file token: %s", ft)
275 m.FileStreamSegments = append(m.FileStreamSegments, pft)
281 func SplitPath(path string) (streamname, filename string) {
282 pathIdx := strings.LastIndex(path, "/")
284 streamname = path[0:pathIdx]
285 filename = path[pathIdx+1:]
293 func (m *Manifest) SegmentManifest() *SegmentedManifest {
294 files := make(SegmentedManifest)
296 for stream := range m.StreamIter() {
297 currentStreamfiles := make(map[string]bool)
298 for _, f := range stream.FileStreamSegments {
299 sn := stream.StreamName
300 if sn != "." && !strings.HasPrefix(sn, "./") {
303 if strings.HasSuffix(sn, "/") {
304 sn = sn[0 : len(sn)-1]
306 path := sn + "/" + f.Name
307 streamname, filename := SplitPath(path)
308 if files[streamname] == nil {
309 files[streamname] = make(SegmentedStream)
311 if !currentStreamfiles[path] {
312 segs := files[streamname][filename]
313 for seg := range stream.FileSegmentIterByName(path) {
314 segs = append(segs, *seg)
316 files[streamname][filename] = segs
317 currentStreamfiles[path] = true
325 func (stream *SegmentedStream) NormalizeStream(name string) string {
326 var sortedfiles []string
327 for k, _ := range *stream {
328 sortedfiles = append(sortedfiles, k)
330 sort.Strings(sortedfiles)
332 stream_tokens := []string{EscapeName(name)}
334 blocks := make(map[string]int64)
335 var streamoffset int64
337 // Go through each file and add each referenced block exactly once.
338 for _, streamfile := range sortedfiles {
339 for _, segment := range (*stream)[streamfile] {
340 if _, ok := blocks[segment.Locator]; !ok {
341 stream_tokens = append(stream_tokens, segment.Locator)
342 blocks[segment.Locator] = streamoffset
343 b, _ := ParseBlockLocator(segment.Locator)
344 streamoffset += int64(b.Size)
349 if len(stream_tokens) == 1 {
350 stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
353 for _, streamfile := range sortedfiles {
354 // Add in file segments
355 span_start := int64(-1)
357 fout := EscapeName(streamfile)
358 for _, segment := range (*stream)[streamfile] {
359 // Collapse adjacent segments
360 streamoffset = blocks[segment.Locator] + int64(segment.Offset)
361 if span_start == -1 {
362 span_start = streamoffset
363 span_end = streamoffset + int64(segment.Len)
365 if streamoffset == span_end {
366 span_end += int64(segment.Len)
368 stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
369 span_start = streamoffset
370 span_end = streamoffset + int64(segment.Len)
375 if span_start != -1 {
376 stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
379 if len((*stream)[streamfile]) == 0 {
380 stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
384 return strings.Join(stream_tokens, " ") + "\n"
387 func (m *Manifest) NormalizeManifest() string {
388 segments := m.SegmentManifest()
390 var sortedstreams []string
391 for k, _ := range *segments {
392 sortedstreams = append(sortedstreams, k)
394 sort.Strings(sortedstreams)
397 for _, k := range sortedstreams {
398 stream := (*segments)[k]
399 manifest += stream.NormalizeStream(k)
404 func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
412 streamname, filename := SplitPath(path)
413 var relocate_stream, relocate_filename string
414 relocate_stream, relocate_filename = SplitPath(relocate)
416 if stream, ok := (*m)[path]; ok {
417 // refers to a single stream
418 return stream.NormalizeStream(relocate)
419 } else if stream, ok := (*m)[streamname]; ok {
420 // refers to a single file in a stream
421 newstream := make(SegmentedStream)
422 if relocate_filename == "" {
423 relocate_filename = filename
425 newstream[relocate_filename] = stream[filename]
426 return newstream.NormalizeStream(relocate_stream)
428 // refers to multiple streams
431 if !strings.HasSuffix(prefix, "/") {
434 if !strings.HasSuffix(relocate, "/") {
438 var sortedstreams []string
439 for k, _ := range *m {
440 sortedstreams = append(sortedstreams, k)
442 sort.Strings(sortedstreams)
444 for _, k := range sortedstreams {
445 if strings.HasPrefix(k, prefix) {
447 manifest += v.NormalizeStream(relocate + k[len(prefix):])
454 func (m *Manifest) ManifestForPath(path, relocate string) string {
455 return m.SegmentManifest().ManifestForPath(path, relocate)
458 func (m *Manifest) StreamIter() <-chan ManifestStream {
459 ch := make(chan ManifestStream)
460 go func(input string) {
461 // This slice holds the current line and the remainder of the
462 // manifest. We parse one line at a time, to save effort if we
463 // only need the first few lines.
464 lines := []string{"", input}
466 lines = strings.SplitN(lines[1], "\n", 2)
467 if len(lines[0]) > 0 {
468 // Only parse non-blank lines
469 ch <- parseManifestStream(lines[0])
480 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
481 ch := make(chan *FileSegment, 64)
482 if !strings.HasPrefix(filepath, "./") {
483 filepath = "./" + filepath
486 for stream := range m.StreamIter() {
487 if !strings.HasPrefix(filepath, stream.StreamName+"/") {
490 stream.sendFileSegmentIterByName(filepath, ch)
497 // Blocks may appear multiple times within the same manifest if they
498 // are used by multiple files. In that case this Iterator will output
499 // the same block multiple times.
501 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
502 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
503 blockChannel := make(chan blockdigest.BlockLocator)
504 go func(streamChannel <-chan ManifestStream) {
505 for ms := range streamChannel {
510 for _, block := range ms.Blocks {
511 if b, err := blockdigest.ParseBlockLocator(block); err == nil {