Merge branch 'main' from workbench2.git
[arvados.git] / sdk / go / manifest / manifest.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 /* Deals with parsing Manifest Text. */
6
7 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
8
9 package manifest
10
11 import (
12         "errors"
13         "fmt"
14         "path"
15         "regexp"
16         "sort"
17         "strconv"
18         "strings"
19
20         "git.arvados.org/arvados.git/sdk/go/blockdigest"
21 )
22
23 var ErrInvalidToken = errors.New("Invalid token")
24
25 type Manifest struct {
26         Text string
27         Err  error
28 }
29
30 type BlockLocator struct {
31         Digest blockdigest.BlockDigest
32         Size   int
33         Hints  []string
34 }
35
36 // FileSegment is a portion of a file that is contained within a
37 // single block.
38 type FileSegment struct {
39         Locator string
40         // Offset (within this block) of this data segment
41         Offset int
42         Len    int
43 }
44
45 // FileStreamSegment is a portion of a file described as a segment of a stream.
46 type FileStreamSegment struct {
47         SegPos uint64
48         SegLen uint64
49         Name   string
50 }
51
52 // ManifestStream represents a single line from a manifest.
53 type ManifestStream struct {
54         StreamName         string
55         Blocks             []string
56         blockOffsets       []uint64
57         FileStreamSegments []FileStreamSegment
58         Err                error
59 }
60
61 // Array of segments referencing file content
62 type segmentedFile []FileSegment
63
64 // Map of files to list of file segments referencing file content
65 type segmentedStream map[string]segmentedFile
66
67 // Map of streams
68 type segmentedManifest map[string]segmentedStream
69
70 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
71
72 func unescapeSeq(seq string) string {
73         if seq == `\\` {
74                 return `\`
75         }
76         i, err := strconv.ParseUint(seq[1:], 8, 8)
77         if err != nil {
78                 // Invalid escape sequence: can't unescape.
79                 return seq
80         }
81         return string([]byte{byte(i)})
82 }
83
84 func EscapeName(s string) string {
85         raw := []byte(s)
86         escaped := make([]byte, 0, len(s))
87         for _, c := range raw {
88                 if c <= 32 {
89                         oct := fmt.Sprintf("\\%03o", c)
90                         escaped = append(escaped, []byte(oct)...)
91                 } else {
92                         escaped = append(escaped, c)
93                 }
94         }
95         return string(escaped)
96 }
97
98 func UnescapeName(s string) string {
99         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
100 }
101
102 func ParseBlockLocator(s string) (b BlockLocator, err error) {
103         if !blockdigest.LocatorPattern.MatchString(s) {
104                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
105                         "\"%s\".",
106                         s,
107                         blockdigest.LocatorPattern.String())
108         } else {
109                 tokens := strings.Split(s, "+")
110                 var blockSize int64
111                 var blockDigest blockdigest.BlockDigest
112                 // We expect both of the following to succeed since LocatorPattern
113                 // restricts the strings appropriately.
114                 blockDigest, err = blockdigest.FromString(tokens[0])
115                 if err != nil {
116                         return
117                 }
118                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
119                 if err != nil {
120                         return
121                 }
122                 b.Digest = blockDigest
123                 b.Size = int(blockSize)
124                 b.Hints = tokens[2:]
125         }
126         return
127 }
128
129 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
130         parts := strings.SplitN(tok, ":", 3)
131         if len(parts) != 3 {
132                 err = ErrInvalidToken
133                 return
134         }
135         ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
136         if err != nil {
137                 return
138         }
139         ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
140         if err != nil {
141                 return
142         }
143         ft.Name = UnescapeName(parts[2])
144         return
145 }
146
147 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
148         ch := make(chan *FileSegment, 64)
149         go func() {
150                 s.sendFileSegmentIterByName(filepath, ch)
151                 close(ch)
152         }()
153         return ch
154 }
155
156 func firstBlock(offsets []uint64, rangeStart uint64) int {
157         // rangeStart/blockStart is the inclusive lower bound
158         // rangeEnd/blockEnd is the exclusive upper bound
159
160         hi := len(offsets) - 1
161         var lo int
162         i := ((hi + lo) / 2)
163         blockStart := offsets[i]
164         blockEnd := offsets[i+1]
165
166         // perform a binary search for the first block
167         // assumes that all of the blocks are contiguous, so rangeStart is guaranteed
168         // to either fall into the range of a block or be outside the block range entirely
169         for !(rangeStart >= blockStart && rangeStart < blockEnd) {
170                 if lo == i {
171                         // must be out of range, fail
172                         return -1
173                 }
174                 if rangeStart > blockStart {
175                         lo = i
176                 } else {
177                         hi = i
178                 }
179                 i = ((hi + lo) / 2)
180                 blockStart = offsets[i]
181                 blockEnd = offsets[i+1]
182         }
183         return i
184 }
185
186 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
187         // This is what streamName+"/"+fileName will look like:
188         target := fixStreamName(filepath)
189         for _, fTok := range s.FileStreamSegments {
190                 wantPos := fTok.SegPos
191                 wantLen := fTok.SegLen
192                 name := fTok.Name
193
194                 if s.StreamName+"/"+name != target {
195                         continue
196                 }
197                 if wantLen == 0 {
198                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
199                         continue
200                 }
201
202                 // Binary search to determine first block in the stream
203                 i := firstBlock(s.blockOffsets, wantPos)
204                 if i == -1 {
205                         // Shouldn't happen, file segments are checked in parseManifestStream
206                         panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
207                 }
208                 for ; i < len(s.Blocks); i++ {
209                         blockPos := s.blockOffsets[i]
210                         blockEnd := s.blockOffsets[i+1]
211                         if blockEnd <= wantPos {
212                                 // Shouldn't happen, FirstBlock() should start
213                                 // us on the right block, so if this triggers
214                                 // that means there is a bug.
215                                 panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
216                         }
217                         if blockPos >= wantPos+wantLen {
218                                 // current block comes after current file span
219                                 break
220                         }
221
222                         fseg := FileSegment{
223                                 Locator: s.Blocks[i],
224                                 Offset:  0,
225                                 Len:     int(blockEnd - blockPos),
226                         }
227                         if blockPos < wantPos {
228                                 fseg.Offset = int(wantPos - blockPos)
229                                 fseg.Len -= fseg.Offset
230                         }
231                         if blockEnd > wantPos+wantLen {
232                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
233                         }
234                         ch <- &fseg
235                 }
236         }
237 }
238
239 func parseManifestStream(s string) (m ManifestStream) {
240         tokens := strings.Split(s, " ")
241
242         m.StreamName = UnescapeName(tokens[0])
243         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
244                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
245                 return
246         }
247
248         tokens = tokens[1:]
249         var i int
250         for i = 0; i < len(tokens); i++ {
251                 if !blockdigest.IsBlockLocator(tokens[i]) {
252                         break
253                 }
254         }
255         m.Blocks = tokens[:i]
256         fileTokens := tokens[i:]
257
258         if len(m.Blocks) == 0 {
259                 m.Err = fmt.Errorf("No block locators found")
260                 return
261         }
262
263         m.blockOffsets = make([]uint64, len(m.Blocks)+1)
264         var streamoffset uint64
265         for i, b := range m.Blocks {
266                 bl, err := ParseBlockLocator(b)
267                 if err != nil {
268                         m.Err = err
269                         return
270                 }
271                 m.blockOffsets[i] = streamoffset
272                 streamoffset += uint64(bl.Size)
273         }
274         m.blockOffsets[len(m.Blocks)] = streamoffset
275
276         if len(fileTokens) == 0 {
277                 m.Err = fmt.Errorf("No file tokens found")
278                 return
279         }
280
281         for _, ft := range fileTokens {
282                 pft, err := parseFileStreamSegment(ft)
283                 if err != nil {
284                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
285                         break
286                 }
287                 if pft.SegPos+pft.SegLen > streamoffset {
288                         m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
289                         break
290                 }
291                 m.FileStreamSegments = append(m.FileStreamSegments, pft)
292         }
293
294         return
295 }
296
297 func fixStreamName(sn string) string {
298         sn = path.Clean(sn)
299         if strings.HasPrefix(sn, "/") {
300                 sn = "." + sn
301         } else if sn != "." {
302                 sn = "./" + sn
303         }
304         return sn
305 }
306
307 func splitPath(srcpath string) (streamname, filename string) {
308         pathIdx := strings.LastIndex(srcpath, "/")
309         if pathIdx >= 0 {
310                 streamname = srcpath[0:pathIdx]
311                 filename = srcpath[pathIdx+1:]
312         } else {
313                 streamname = srcpath
314                 filename = ""
315         }
316         return
317 }
318
319 func (m *Manifest) segment() (*segmentedManifest, error) {
320         files := make(segmentedManifest)
321
322         for stream := range m.StreamIter() {
323                 if stream.Err != nil {
324                         // Stream has an error
325                         return nil, stream.Err
326                 }
327                 currentStreamfiles := make(map[string]bool)
328                 for _, f := range stream.FileStreamSegments {
329                         sn := stream.StreamName
330                         if strings.HasSuffix(sn, "/") {
331                                 sn = sn[0 : len(sn)-1]
332                         }
333                         path := sn + "/" + f.Name
334                         streamname, filename := splitPath(path)
335                         if files[streamname] == nil {
336                                 files[streamname] = make(segmentedStream)
337                         }
338                         if !currentStreamfiles[path] {
339                                 segs := files[streamname][filename]
340                                 for seg := range stream.FileSegmentIterByName(path) {
341                                         if seg.Len > 0 {
342                                                 segs = append(segs, *seg)
343                                         }
344                                 }
345                                 files[streamname][filename] = segs
346                                 currentStreamfiles[path] = true
347                         }
348                 }
349         }
350
351         return &files, nil
352 }
353
354 func (stream segmentedStream) normalizedText(name string) string {
355         var sortedfiles []string
356         for k := range stream {
357                 sortedfiles = append(sortedfiles, k)
358         }
359         sort.Strings(sortedfiles)
360
361         streamTokens := []string{EscapeName(name)}
362
363         blocks := make(map[blockdigest.BlockDigest]int64)
364         var streamoffset int64
365
366         // Go through each file and add each referenced block exactly once.
367         for _, streamfile := range sortedfiles {
368                 for _, segment := range stream[streamfile] {
369                         b, _ := ParseBlockLocator(segment.Locator)
370                         if _, ok := blocks[b.Digest]; !ok {
371                                 streamTokens = append(streamTokens, segment.Locator)
372                                 blocks[b.Digest] = streamoffset
373                                 streamoffset += int64(b.Size)
374                         }
375                 }
376         }
377
378         if len(streamTokens) == 1 {
379                 streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0")
380         }
381
382         for _, streamfile := range sortedfiles {
383                 // Add in file segments
384                 spanStart := int64(-1)
385                 spanEnd := int64(0)
386                 fout := EscapeName(streamfile)
387                 for _, segment := range stream[streamfile] {
388                         // Collapse adjacent segments
389                         b, _ := ParseBlockLocator(segment.Locator)
390                         streamoffset = blocks[b.Digest] + int64(segment.Offset)
391                         if spanStart == -1 {
392                                 spanStart = streamoffset
393                                 spanEnd = streamoffset + int64(segment.Len)
394                         } else {
395                                 if streamoffset == spanEnd {
396                                         spanEnd += int64(segment.Len)
397                                 } else {
398                                         streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
399                                         spanStart = streamoffset
400                                         spanEnd = streamoffset + int64(segment.Len)
401                                 }
402                         }
403                 }
404
405                 if spanStart != -1 {
406                         streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
407                 }
408
409                 if len(stream[streamfile]) == 0 {
410                         streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout))
411                 }
412         }
413
414         return strings.Join(streamTokens, " ") + "\n"
415 }
416
417 func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
418         srcpath = fixStreamName(srcpath)
419
420         var suffix string
421         if strings.HasSuffix(relocate, "/") {
422                 suffix = "/"
423         }
424         relocate = fixStreamName(relocate) + suffix
425
426         streamname, filename := splitPath(srcpath)
427
428         if stream, ok := m[streamname]; ok {
429                 // check if it refers to a single file in a stream
430                 filesegs, okfile := stream[filename]
431                 if okfile {
432                         newstream := make(segmentedStream)
433                         relocateStream, relocateFilename := splitPath(relocate)
434                         if relocateFilename == "" {
435                                 relocateFilename = filename
436                         }
437                         newstream[relocateFilename] = filesegs
438                         return newstream.normalizedText(relocateStream)
439                 }
440         }
441
442         // Going to extract multiple streams
443         prefix := srcpath + "/"
444
445         if strings.HasSuffix(relocate, "/") {
446                 relocate = relocate[0 : len(relocate)-1]
447         }
448
449         var sortedstreams []string
450         for k := range m {
451                 sortedstreams = append(sortedstreams, k)
452         }
453         sort.Strings(sortedstreams)
454
455         manifest := ""
456         for _, k := range sortedstreams {
457                 if strings.HasPrefix(k, prefix) || k == srcpath {
458                         manifest += m[k].normalizedText(relocate + k[len(srcpath):])
459                 }
460         }
461         return manifest
462 }
463
464 // Extract extracts some or all of the manifest and returns the extracted
465 // portion as a normalized manifest.  This is a swiss army knife function that
466 // can be several ways:
467 //
468 // If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
469 // in normalized form.
470 //
471 //      Extract(".", ".")  // return entire normalized manfest text
472 //
473 // If 'srcpath' points to a single file, it will return manifest text for just that file.
474 // The value of "relocate" is can be used to rename the file or set the file stream.
475 //
476 //      Extract("./foo", ".")          // extract file "foo" and put it in stream "."
477 //      Extract("./foo", "./bar")      // extract file "foo", rename it to "bar" in stream "."
478 //      Extract("./foo", "./bar/")     // extract file "foo", rename it to "./bar/foo"
479 //      Extract("./foo", "./bar/baz")  // extract file "foo", rename it to "./bar/baz")
480 //
481 // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
482 // them under the path in "relocate".
483 //
484 //      Extract("./stream", ".")      // extract "./stream" to "." and "./stream/subdir" to "./subdir")
485 //      Extract("./stream", "./bar")  // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
486 func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
487         segmented, err := m.segment()
488         if err != nil {
489                 ret.Err = err
490                 return
491         }
492         ret.Text = segmented.manifestTextForPath(srcpath, relocate)
493         return
494 }
495
496 func (m *Manifest) StreamIter() <-chan ManifestStream {
497         ch := make(chan ManifestStream)
498         go func(input string) {
499                 // This slice holds the current line and the remainder of the
500                 // manifest.  We parse one line at a time, to save effort if we
501                 // only need the first few lines.
502                 lines := []string{"", input}
503                 for {
504                         lines = strings.SplitN(lines[1], "\n", 2)
505                         if len(lines[0]) > 0 {
506                                 // Only parse non-blank lines
507                                 ch <- parseManifestStream(lines[0])
508                         }
509                         if len(lines) == 1 {
510                                 break
511                         }
512                 }
513                 close(ch)
514         }(m.Text)
515         return ch
516 }
517
518 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
519         ch := make(chan *FileSegment, 64)
520         filepath = fixStreamName(filepath)
521         go func() {
522                 for stream := range m.StreamIter() {
523                         if !strings.HasPrefix(filepath, stream.StreamName+"/") {
524                                 continue
525                         }
526                         stream.sendFileSegmentIterByName(filepath, ch)
527                 }
528                 close(ch)
529         }()
530         return ch
531 }
532
533 // BlockIterWithDuplicates iterates over the block locators of a manifest.
534 //
535 // Blocks may appear multiple times within the same manifest if they
536 // are used by multiple files. In that case this Iterator will output
537 // the same block multiple times.
538 //
539 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
540 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
541         blockChannel := make(chan blockdigest.BlockLocator)
542         go func(streamChannel <-chan ManifestStream) {
543                 for ms := range streamChannel {
544                         if ms.Err != nil {
545                                 m.Err = ms.Err
546                                 continue
547                         }
548                         for _, block := range ms.Blocks {
549                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
550                                         blockChannel <- b
551                                 } else {
552                                         m.Err = err
553                                 }
554                         }
555                 }
556                 close(blockChannel)
557         }(m.StreamIter())
558         return blockChannel
559 }