1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: Apache-2.0
5 /* Deals with parsing Manifest Text. */
7 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
21 "git.arvados.org/arvados.git/sdk/go/blockdigest"
24 var ErrInvalidToken = errors.New("Invalid token")
26 type Manifest struct {
31 type BlockLocator struct {
32 Digest blockdigest.BlockDigest
37 // FileSegment is a portion of a file that is contained within a
39 type FileSegment struct {
41 // Offset (within this block) of this data segment
46 // FileStreamSegment is a portion of a file described as a segment of a stream.
47 type FileStreamSegment struct {
53 // ManifestStream represents a single line from a manifest.
54 type ManifestStream struct {
58 FileStreamSegments []FileStreamSegment
62 // Array of segments referencing file content
63 type segmentedFile []FileSegment
65 // Map of files to list of file segments referencing file content
66 type segmentedStream map[string]segmentedFile
69 type segmentedManifest map[string]segmentedStream
71 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
73 func unescapeSeq(seq string) string {
77 i, err := strconv.ParseUint(seq[1:], 8, 8)
79 // Invalid escape sequence: can't unescape.
82 return string([]byte{byte(i)})
85 func EscapeName(s string) string {
87 escaped := make([]byte, 0, len(s))
88 for _, c := range raw {
90 oct := fmt.Sprintf("\\%03o", c)
91 escaped = append(escaped, []byte(oct)...)
93 escaped = append(escaped, c)
96 return string(escaped)
99 func UnescapeName(s string) string {
100 return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
103 func ParseBlockLocator(s string) (b BlockLocator, err error) {
104 if !blockdigest.LocatorPattern.MatchString(s) {
105 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
108 blockdigest.LocatorPattern.String())
110 tokens := strings.Split(s, "+")
112 var blockDigest blockdigest.BlockDigest
113 // We expect both of the following to succeed since LocatorPattern
114 // restricts the strings appropriately.
115 blockDigest, err = blockdigest.FromString(tokens[0])
119 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
123 b.Digest = blockDigest
124 b.Size = int(blockSize)
130 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
131 parts := strings.SplitN(tok, ":", 3)
133 err = ErrInvalidToken
136 ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
140 ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
144 ft.Name = UnescapeName(parts[2])
148 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
149 ch := make(chan *FileSegment, 64)
151 s.sendFileSegmentIterByName(filepath, ch)
157 func firstBlock(offsets []uint64, rangeStart uint64) int {
158 // rangeStart/blockStart is the inclusive lower bound
159 // rangeEnd/blockEnd is the exclusive upper bound
161 hi := len(offsets) - 1
164 blockStart := offsets[i]
165 blockEnd := offsets[i+1]
167 // perform a binary search for the first block
168 // assumes that all of the blocks are contiguous, so rangeStart is guaranteed
169 // to either fall into the range of a block or be outside the block range entirely
170 for !(rangeStart >= blockStart && rangeStart < blockEnd) {
172 // must be out of range, fail
175 if rangeStart > blockStart {
181 blockStart = offsets[i]
182 blockEnd = offsets[i+1]
187 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
188 // This is what streamName+"/"+fileName will look like:
189 target := fixStreamName(filepath)
190 if !strings.HasPrefix(target, s.StreamName+"/") {
193 targetTokName := target[len(s.StreamName)+1:]
194 for _, fTok := range s.FileStreamSegments {
195 if fTok.Name != targetTokName {
198 wantPos := fTok.SegPos
199 wantLen := fTok.SegLen
201 ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
205 // Binary search to determine first block in the stream
206 i := firstBlock(s.blockOffsets, wantPos)
208 // Shouldn't happen, file segments are checked in parseManifestStream
209 panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
211 for ; i < len(s.Blocks); i++ {
212 blockPos := s.blockOffsets[i]
213 blockEnd := s.blockOffsets[i+1]
214 if blockEnd <= wantPos {
215 // Shouldn't happen, FirstBlock() should start
216 // us on the right block, so if this triggers
217 // that means there is a bug.
218 panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
220 if blockPos >= wantPos+wantLen {
221 // current block comes after current file span
226 Locator: s.Blocks[i],
228 Len: int(blockEnd - blockPos),
230 if blockPos < wantPos {
231 fseg.Offset = int(wantPos - blockPos)
232 fseg.Len -= fseg.Offset
234 if blockEnd > wantPos+wantLen {
235 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
242 func parseManifestStream(s string) (m ManifestStream) {
243 tokens := strings.Split(s, " ")
245 m.StreamName = UnescapeName(tokens[0])
246 if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
247 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
253 for i = 0; i < len(tokens); i++ {
254 if !blockdigest.IsBlockLocator(tokens[i]) {
258 m.Blocks = tokens[:i]
259 fileTokens := tokens[i:]
261 if len(m.Blocks) == 0 {
262 m.Err = fmt.Errorf("No block locators found")
266 m.blockOffsets = make([]uint64, len(m.Blocks)+1)
267 var streamoffset uint64
268 for i, b := range m.Blocks {
269 bl, err := ParseBlockLocator(b)
274 m.blockOffsets[i] = streamoffset
275 streamoffset += uint64(bl.Size)
277 m.blockOffsets[len(m.Blocks)] = streamoffset
279 if len(fileTokens) == 0 {
280 m.Err = fmt.Errorf("No file tokens found")
284 for _, ft := range fileTokens {
285 pft, err := parseFileStreamSegment(ft)
287 m.Err = fmt.Errorf("Invalid file token: %s", ft)
290 if pft.SegPos+pft.SegLen > streamoffset {
291 m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
294 m.FileStreamSegments = append(m.FileStreamSegments, pft)
300 func fixStreamName(sn string) string {
302 if strings.HasPrefix(sn, "/") {
304 } else if sn != "." {
310 func splitPath(srcpath string) (streamname, filename string) {
311 pathIdx := strings.LastIndex(srcpath, "/")
313 streamname = srcpath[0:pathIdx]
314 filename = srcpath[pathIdx+1:]
322 func (m *Manifest) segment() (*segmentedManifest, error) {
323 files := make(segmentedManifest)
325 for stream := range m.StreamIter() {
326 if stream.Err != nil {
327 // Stream has an error
328 return nil, stream.Err
330 currentStreamfiles := make(map[string]bool)
331 for _, f := range stream.FileStreamSegments {
332 sn := stream.StreamName
333 if strings.HasSuffix(sn, "/") {
334 sn = sn[0 : len(sn)-1]
336 path := sn + "/" + f.Name
337 streamname, filename := splitPath(path)
338 if files[streamname] == nil {
339 files[streamname] = make(segmentedStream)
341 if !currentStreamfiles[path] {
342 segs := files[streamname][filename]
343 for seg := range stream.FileSegmentIterByName(path) {
345 segs = append(segs, *seg)
348 files[streamname][filename] = segs
349 currentStreamfiles[path] = true
357 func (stream segmentedStream) normalizedText(name string) string {
358 var sortedfiles []string
359 for k := range stream {
360 sortedfiles = append(sortedfiles, k)
362 sort.Strings(sortedfiles)
364 streamTokens := []string{EscapeName(name)}
366 blocks := make(map[blockdigest.BlockDigest]int64)
367 var streamoffset int64
369 // Go through each file and add each referenced block exactly once.
370 for _, streamfile := range sortedfiles {
371 for _, segment := range stream[streamfile] {
372 b, _ := ParseBlockLocator(segment.Locator)
373 if _, ok := blocks[b.Digest]; !ok {
374 streamTokens = append(streamTokens, segment.Locator)
375 blocks[b.Digest] = streamoffset
376 streamoffset += int64(b.Size)
381 if len(streamTokens) == 1 {
382 streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0")
385 for _, streamfile := range sortedfiles {
386 // Add in file segments
387 spanStart := int64(-1)
389 fout := EscapeName(streamfile)
390 for _, segment := range stream[streamfile] {
391 // Collapse adjacent segments
392 b, _ := ParseBlockLocator(segment.Locator)
393 streamoffset = blocks[b.Digest] + int64(segment.Offset)
395 spanStart = streamoffset
396 spanEnd = streamoffset + int64(segment.Len)
398 if streamoffset == spanEnd {
399 spanEnd += int64(segment.Len)
401 streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
402 spanStart = streamoffset
403 spanEnd = streamoffset + int64(segment.Len)
409 streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
412 if len(stream[streamfile]) == 0 {
413 streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout))
417 return strings.Join(streamTokens, " ") + "\n"
420 func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
421 srcpath = fixStreamName(srcpath)
424 if strings.HasSuffix(relocate, "/") {
427 relocate = fixStreamName(relocate) + suffix
429 streamname, filename := splitPath(srcpath)
431 if stream, ok := m[streamname]; ok {
432 // check if it refers to a single file in a stream
433 filesegs, okfile := stream[filename]
435 newstream := make(segmentedStream)
436 relocateStream, relocateFilename := splitPath(relocate)
437 if relocateFilename == "" {
438 relocateFilename = filename
440 newstream[relocateFilename] = filesegs
441 return newstream.normalizedText(relocateStream)
445 // Going to extract multiple streams
446 prefix := srcpath + "/"
448 if strings.HasSuffix(relocate, "/") {
449 relocate = relocate[0 : len(relocate)-1]
452 var sortedstreams []string
454 sortedstreams = append(sortedstreams, k)
456 sort.Strings(sortedstreams)
458 var manifest bytes.Buffer
459 for _, k := range sortedstreams {
460 if strings.HasPrefix(k, prefix) || k == srcpath {
461 manifest.WriteString(m[k].normalizedText(relocate + k[len(srcpath):]))
464 return manifest.String()
467 // Extract extracts some or all of the manifest and returns the extracted
468 // portion as a normalized manifest. This is a swiss army knife function that
469 // can be several ways:
471 // If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
472 // in normalized form.
474 // Extract(".", ".") // return entire normalized manfest text
476 // If 'srcpath' points to a single file, it will return manifest text for just that file.
477 // The value of "relocate" is can be used to rename the file or set the file stream.
479 // Extract("./foo", ".") // extract file "foo" and put it in stream "."
480 // Extract("./foo", "./bar") // extract file "foo", rename it to "bar" in stream "."
481 // Extract("./foo", "./bar/") // extract file "foo", rename it to "./bar/foo"
482 // Extract("./foo", "./bar/baz") // extract file "foo", rename it to "./bar/baz")
484 // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
485 // them under the path in "relocate".
487 // Extract("./stream", ".") // extract "./stream" to "." and "./stream/subdir" to "./subdir")
488 // Extract("./stream", "./bar") // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
489 func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
490 segmented, err := m.segment()
495 ret.Text = segmented.manifestTextForPath(srcpath, relocate)
499 func (m *Manifest) StreamIter() <-chan ManifestStream {
500 ch := make(chan ManifestStream)
501 go func(input string) {
502 // This slice holds the current line and the remainder of the
503 // manifest. We parse one line at a time, to save effort if we
504 // only need the first few lines.
505 lines := []string{"", input}
507 lines = strings.SplitN(lines[1], "\n", 2)
508 if len(lines[0]) > 0 {
509 // Only parse non-blank lines
510 ch <- parseManifestStream(lines[0])
521 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
522 ch := make(chan *FileSegment, 64)
523 filepath = fixStreamName(filepath)
525 for stream := range m.StreamIter() {
526 if !strings.HasPrefix(filepath, stream.StreamName+"/") {
529 stream.sendFileSegmentIterByName(filepath, ch)
536 // BlockIterWithDuplicates iterates over the block locators of a manifest.
538 // Blocks may appear multiple times within the same manifest if they
539 // are used by multiple files. In that case this Iterator will output
540 // the same block multiple times.
542 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
543 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
544 blockChannel := make(chan blockdigest.BlockLocator)
545 go func(streamChannel <-chan ManifestStream) {
546 for ms := range streamChannel {
551 for _, block := range ms.Blocks {
552 if b, err := blockdigest.ParseBlockLocator(block); err == nil {