1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 /* Deals with parsing Manifest Text. */
7 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
14 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
22 var ErrInvalidToken = errors.New("Invalid token")
24 type Manifest struct {
29 type BlockLocator struct {
30 Digest blockdigest.BlockDigest
35 // FileSegment is a portion of a file that is contained within a
37 type FileSegment struct {
39 // Offset (within this block) of this data segment
44 // FileStreamSegment is a portion of a file described as a segment of a stream.
45 type FileStreamSegment struct {
51 // Represents a single line from a manifest.
52 type ManifestStream struct {
56 FileStreamSegments []FileStreamSegment
60 // Array of segments referencing file content
61 type segmentedFile []FileSegment
63 // Map of files to list of file segments referencing file content
64 type segmentedStream map[string]segmentedFile
67 type segmentedManifest map[string]segmentedStream
69 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
71 func unescapeSeq(seq string) string {
75 i, err := strconv.ParseUint(seq[1:], 8, 8)
77 // Invalid escape sequence: can't unescape.
80 return string([]byte{byte(i)})
83 func EscapeName(s string) string {
85 escaped := make([]byte, 0, len(s))
86 for _, c := range raw {
88 oct := fmt.Sprintf("\\%03o", c)
89 escaped = append(escaped, []byte(oct)...)
91 escaped = append(escaped, c)
94 return string(escaped)
97 func UnescapeName(s string) string {
98 return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
101 func ParseBlockLocator(s string) (b BlockLocator, err error) {
102 if !blockdigest.LocatorPattern.MatchString(s) {
103 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
106 blockdigest.LocatorPattern.String())
108 tokens := strings.Split(s, "+")
110 var blockDigest blockdigest.BlockDigest
111 // We expect both of the following to succeed since LocatorPattern
112 // restricts the strings appropriately.
113 blockDigest, err = blockdigest.FromString(tokens[0])
117 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
121 b.Digest = blockDigest
122 b.Size = int(blockSize)
128 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
129 parts := strings.SplitN(tok, ":", 3)
131 err = ErrInvalidToken
134 ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
138 ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
142 ft.Name = UnescapeName(parts[2])
146 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
147 ch := make(chan *FileSegment, 64)
149 s.sendFileSegmentIterByName(filepath, ch)
155 func firstBlock(offsets []uint64, range_start uint64) int {
156 // range_start/block_start is the inclusive lower bound
157 // range_end/block_end is the exclusive upper bound
159 hi := len(offsets) - 1
162 block_start := offsets[i]
163 block_end := offsets[i+1]
165 // perform a binary search for the first block
166 // assumes that all of the blocks are contiguous, so range_start is guaranteed
167 // to either fall into the range of a block or be outside the block range entirely
168 for !(range_start >= block_start && range_start < block_end) {
170 // must be out of range, fail
173 if range_start > block_start {
179 block_start = offsets[i]
180 block_end = offsets[i+1]
185 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
186 // This is what streamName+"/"+fileName will look like:
187 target := fixStreamName(filepath)
188 for _, fTok := range s.FileStreamSegments {
189 wantPos := fTok.SegPos
190 wantLen := fTok.SegLen
193 if s.StreamName+"/"+name != target {
197 ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
201 // Binary search to determine first block in the stream
202 i := firstBlock(s.blockOffsets, wantPos)
204 // Shouldn't happen, file segments are checked in parseManifestStream
205 panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
207 for ; i < len(s.Blocks); i++ {
208 blockPos := s.blockOffsets[i]
209 blockEnd := s.blockOffsets[i+1]
210 if blockEnd <= wantPos {
211 // Shouldn't happen, FirstBlock() should start
212 // us on the right block, so if this triggers
213 // that means there is a bug.
214 panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
216 if blockPos >= wantPos+wantLen {
217 // current block comes after current file span
222 Locator: s.Blocks[i],
224 Len: int(blockEnd - blockPos),
226 if blockPos < wantPos {
227 fseg.Offset = int(wantPos - blockPos)
228 fseg.Len -= fseg.Offset
230 if blockEnd > wantPos+wantLen {
231 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
238 func parseManifestStream(s string) (m ManifestStream) {
239 tokens := strings.Split(s, " ")
241 m.StreamName = UnescapeName(tokens[0])
242 if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
243 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
249 for i = 0; i < len(tokens); i++ {
250 if !blockdigest.IsBlockLocator(tokens[i]) {
254 m.Blocks = tokens[:i]
255 fileTokens := tokens[i:]
257 if len(m.Blocks) == 0 {
258 m.Err = fmt.Errorf("No block locators found")
262 m.blockOffsets = make([]uint64, len(m.Blocks)+1)
263 var streamoffset uint64
264 for i, b := range m.Blocks {
265 bl, err := ParseBlockLocator(b)
270 m.blockOffsets[i] = streamoffset
271 streamoffset += uint64(bl.Size)
273 m.blockOffsets[len(m.Blocks)] = streamoffset
275 if len(fileTokens) == 0 {
276 m.Err = fmt.Errorf("No file tokens found")
280 for _, ft := range fileTokens {
281 pft, err := parseFileStreamSegment(ft)
283 m.Err = fmt.Errorf("Invalid file token: %s", ft)
286 if pft.SegPos+pft.SegLen > streamoffset {
287 m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
290 m.FileStreamSegments = append(m.FileStreamSegments, pft)
296 func fixStreamName(sn string) string {
298 if strings.HasPrefix(sn, "/") {
300 } else if sn != "." {
306 func splitPath(srcpath string) (streamname, filename string) {
307 pathIdx := strings.LastIndex(srcpath, "/")
309 streamname = srcpath[0:pathIdx]
310 filename = srcpath[pathIdx+1:]
318 func (m *Manifest) segment() (*segmentedManifest, error) {
319 files := make(segmentedManifest)
321 for stream := range m.StreamIter() {
322 if stream.Err != nil {
323 // Stream has an error
324 return nil, stream.Err
326 currentStreamfiles := make(map[string]bool)
327 for _, f := range stream.FileStreamSegments {
328 sn := stream.StreamName
329 if strings.HasSuffix(sn, "/") {
330 sn = sn[0 : len(sn)-1]
332 path := sn + "/" + f.Name
333 streamname, filename := splitPath(path)
334 if files[streamname] == nil {
335 files[streamname] = make(segmentedStream)
337 if !currentStreamfiles[path] {
338 segs := files[streamname][filename]
339 for seg := range stream.FileSegmentIterByName(path) {
341 segs = append(segs, *seg)
344 files[streamname][filename] = segs
345 currentStreamfiles[path] = true
353 func (stream segmentedStream) normalizedText(name string) string {
354 var sortedfiles []string
355 for k, _ := range stream {
356 sortedfiles = append(sortedfiles, k)
358 sort.Strings(sortedfiles)
360 stream_tokens := []string{EscapeName(name)}
362 blocks := make(map[blockdigest.BlockDigest]int64)
363 var streamoffset int64
365 // Go through each file and add each referenced block exactly once.
366 for _, streamfile := range sortedfiles {
367 for _, segment := range stream[streamfile] {
368 b, _ := ParseBlockLocator(segment.Locator)
369 if _, ok := blocks[b.Digest]; !ok {
370 stream_tokens = append(stream_tokens, segment.Locator)
371 blocks[b.Digest] = streamoffset
372 streamoffset += int64(b.Size)
377 if len(stream_tokens) == 1 {
378 stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
381 for _, streamfile := range sortedfiles {
382 // Add in file segments
383 span_start := int64(-1)
385 fout := EscapeName(streamfile)
386 for _, segment := range stream[streamfile] {
387 // Collapse adjacent segments
388 b, _ := ParseBlockLocator(segment.Locator)
389 streamoffset = blocks[b.Digest] + int64(segment.Offset)
390 if span_start == -1 {
391 span_start = streamoffset
392 span_end = streamoffset + int64(segment.Len)
394 if streamoffset == span_end {
395 span_end += int64(segment.Len)
397 stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
398 span_start = streamoffset
399 span_end = streamoffset + int64(segment.Len)
404 if span_start != -1 {
405 stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
408 if len(stream[streamfile]) == 0 {
409 stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
413 return strings.Join(stream_tokens, " ") + "\n"
416 func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
417 srcpath = fixStreamName(srcpath)
420 if strings.HasSuffix(relocate, "/") {
423 relocate = fixStreamName(relocate) + suffix
425 streamname, filename := splitPath(srcpath)
427 if stream, ok := m[streamname]; ok {
428 // check if it refers to a single file in a stream
429 filesegs, okfile := stream[filename]
431 newstream := make(segmentedStream)
432 relocate_stream, relocate_filename := splitPath(relocate)
433 if relocate_filename == "" {
434 relocate_filename = filename
436 newstream[relocate_filename] = filesegs
437 return newstream.normalizedText(relocate_stream)
441 // Going to extract multiple streams
442 prefix := srcpath + "/"
444 if strings.HasSuffix(relocate, "/") {
445 relocate = relocate[0 : len(relocate)-1]
448 var sortedstreams []string
449 for k, _ := range m {
450 sortedstreams = append(sortedstreams, k)
452 sort.Strings(sortedstreams)
455 for _, k := range sortedstreams {
456 if strings.HasPrefix(k, prefix) || k == srcpath {
457 manifest += m[k].normalizedText(relocate + k[len(srcpath):])
463 // Extract extracts some or all of the manifest and returns the extracted
464 // portion as a normalized manifest. This is a swiss army knife function that
465 // can be several ways:
467 // If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
468 // in normalized form.
470 // Extract(".", ".") // return entire normalized manfest text
472 // If 'srcpath' points to a single file, it will return manifest text for just that file.
473 // The value of "relocate" is can be used to rename the file or set the file stream.
475 // Extract("./foo", ".") // extract file "foo" and put it in stream "."
476 // Extract("./foo", "./bar") // extract file "foo", rename it to "bar" in stream "."
477 // Extract("./foo", "./bar/") // extract file "foo", rename it to "./bar/foo"
478 // Extract("./foo", "./bar/baz") // extract file "foo", rename it to "./bar/baz")
480 // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
481 // them under the path in "relocate".
483 // Extract("./stream", ".") // extract "./stream" to "." and "./stream/subdir" to "./subdir")
484 // Extract("./stream", "./bar") // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
485 func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
486 segmented, err := m.segment()
491 ret.Text = segmented.manifestTextForPath(srcpath, relocate)
495 func (m *Manifest) StreamIter() <-chan ManifestStream {
496 ch := make(chan ManifestStream)
497 go func(input string) {
498 // This slice holds the current line and the remainder of the
499 // manifest. We parse one line at a time, to save effort if we
500 // only need the first few lines.
501 lines := []string{"", input}
503 lines = strings.SplitN(lines[1], "\n", 2)
504 if len(lines[0]) > 0 {
505 // Only parse non-blank lines
506 ch <- parseManifestStream(lines[0])
517 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
518 ch := make(chan *FileSegment, 64)
519 filepath = fixStreamName(filepath)
521 for stream := range m.StreamIter() {
522 if !strings.HasPrefix(filepath, stream.StreamName+"/") {
525 stream.sendFileSegmentIterByName(filepath, ch)
532 // Blocks may appear multiple times within the same manifest if they
533 // are used by multiple files. In that case this Iterator will output
534 // the same block multiple times.
536 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
537 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
538 blockChannel := make(chan blockdigest.BlockLocator)
539 go func(streamChannel <-chan ManifestStream) {
540 for ms := range streamChannel {
545 for _, block := range ms.Blocks {
546 if b, err := blockdigest.ParseBlockLocator(block); err == nil {