Merge branch '20336-filter-bigint'
[arvados.git] / sdk / go / manifest / manifest.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 /* Deals with parsing Manifest Text. */
6
7 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
8
9 package manifest
10
11 import (
12         "errors"
13         "fmt"
14         "git.arvados.org/arvados.git/sdk/go/blockdigest"
15         "path"
16         "regexp"
17         "sort"
18         "strconv"
19         "strings"
20 )
21
22 var ErrInvalidToken = errors.New("Invalid token")
23
24 type Manifest struct {
25         Text string
26         Err  error
27 }
28
29 type BlockLocator struct {
30         Digest blockdigest.BlockDigest
31         Size   int
32         Hints  []string
33 }
34
35 // FileSegment is a portion of a file that is contained within a
36 // single block.
37 type FileSegment struct {
38         Locator string
39         // Offset (within this block) of this data segment
40         Offset int
41         Len    int
42 }
43
44 // FileStreamSegment is a portion of a file described as a segment of a stream.
45 type FileStreamSegment struct {
46         SegPos uint64
47         SegLen uint64
48         Name   string
49 }
50
51 // ManifestStream represents a single line from a manifest.
52 type ManifestStream struct {
53         StreamName         string
54         Blocks             []string
55         blockOffsets       []uint64
56         FileStreamSegments []FileStreamSegment
57         Err                error
58 }
59
60 // Array of segments referencing file content
61 type segmentedFile []FileSegment
62
63 // Map of files to list of file segments referencing file content
64 type segmentedStream map[string]segmentedFile
65
66 // Map of streams
67 type segmentedManifest map[string]segmentedStream
68
69 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
70
71 func unescapeSeq(seq string) string {
72         if seq == `\\` {
73                 return `\`
74         }
75         i, err := strconv.ParseUint(seq[1:], 8, 8)
76         if err != nil {
77                 // Invalid escape sequence: can't unescape.
78                 return seq
79         }
80         return string([]byte{byte(i)})
81 }
82
83 func EscapeName(s string) string {
84         raw := []byte(s)
85         escaped := make([]byte, 0, len(s))
86         for _, c := range raw {
87                 if c <= 32 {
88                         oct := fmt.Sprintf("\\%03o", c)
89                         escaped = append(escaped, []byte(oct)...)
90                 } else {
91                         escaped = append(escaped, c)
92                 }
93         }
94         return string(escaped)
95 }
96
97 func UnescapeName(s string) string {
98         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
99 }
100
101 func ParseBlockLocator(s string) (b BlockLocator, err error) {
102         if !blockdigest.LocatorPattern.MatchString(s) {
103                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
104                         "\"%s\".",
105                         s,
106                         blockdigest.LocatorPattern.String())
107         } else {
108                 tokens := strings.Split(s, "+")
109                 var blockSize int64
110                 var blockDigest blockdigest.BlockDigest
111                 // We expect both of the following to succeed since LocatorPattern
112                 // restricts the strings appropriately.
113                 blockDigest, err = blockdigest.FromString(tokens[0])
114                 if err != nil {
115                         return
116                 }
117                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
118                 if err != nil {
119                         return
120                 }
121                 b.Digest = blockDigest
122                 b.Size = int(blockSize)
123                 b.Hints = tokens[2:]
124         }
125         return
126 }
127
128 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
129         parts := strings.SplitN(tok, ":", 3)
130         if len(parts) != 3 {
131                 err = ErrInvalidToken
132                 return
133         }
134         ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
135         if err != nil {
136                 return
137         }
138         ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
139         if err != nil {
140                 return
141         }
142         ft.Name = UnescapeName(parts[2])
143         return
144 }
145
146 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
147         ch := make(chan *FileSegment, 64)
148         go func() {
149                 s.sendFileSegmentIterByName(filepath, ch)
150                 close(ch)
151         }()
152         return ch
153 }
154
155 func firstBlock(offsets []uint64, rangeStart uint64) int {
156         // rangeStart/blockStart is the inclusive lower bound
157         // rangeEnd/blockEnd is the exclusive upper bound
158
159         hi := len(offsets) - 1
160         var lo int
161         i := ((hi + lo) / 2)
162         blockStart := offsets[i]
163         blockEnd := offsets[i+1]
164
165         // perform a binary search for the first block
166         // assumes that all of the blocks are contiguous, so rangeStart is guaranteed
167         // to either fall into the range of a block or be outside the block range entirely
168         for !(rangeStart >= blockStart && rangeStart < blockEnd) {
169                 if lo == i {
170                         // must be out of range, fail
171                         return -1
172                 }
173                 if rangeStart > blockStart {
174                         lo = i
175                 } else {
176                         hi = i
177                 }
178                 i = ((hi + lo) / 2)
179                 blockStart = offsets[i]
180                 blockEnd = offsets[i+1]
181         }
182         return i
183 }
184
185 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
186         // This is what streamName+"/"+fileName will look like:
187         target := fixStreamName(filepath)
188         for _, fTok := range s.FileStreamSegments {
189                 wantPos := fTok.SegPos
190                 wantLen := fTok.SegLen
191                 name := fTok.Name
192
193                 if s.StreamName+"/"+name != target {
194                         continue
195                 }
196                 if wantLen == 0 {
197                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
198                         continue
199                 }
200
201                 // Binary search to determine first block in the stream
202                 i := firstBlock(s.blockOffsets, wantPos)
203                 if i == -1 {
204                         // Shouldn't happen, file segments are checked in parseManifestStream
205                         panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
206                 }
207                 for ; i < len(s.Blocks); i++ {
208                         blockPos := s.blockOffsets[i]
209                         blockEnd := s.blockOffsets[i+1]
210                         if blockEnd <= wantPos {
211                                 // Shouldn't happen, FirstBlock() should start
212                                 // us on the right block, so if this triggers
213                                 // that means there is a bug.
214                                 panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
215                         }
216                         if blockPos >= wantPos+wantLen {
217                                 // current block comes after current file span
218                                 break
219                         }
220
221                         fseg := FileSegment{
222                                 Locator: s.Blocks[i],
223                                 Offset:  0,
224                                 Len:     int(blockEnd - blockPos),
225                         }
226                         if blockPos < wantPos {
227                                 fseg.Offset = int(wantPos - blockPos)
228                                 fseg.Len -= fseg.Offset
229                         }
230                         if blockEnd > wantPos+wantLen {
231                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
232                         }
233                         ch <- &fseg
234                 }
235         }
236 }
237
238 func parseManifestStream(s string) (m ManifestStream) {
239         tokens := strings.Split(s, " ")
240
241         m.StreamName = UnescapeName(tokens[0])
242         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
243                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
244                 return
245         }
246
247         tokens = tokens[1:]
248         var i int
249         for i = 0; i < len(tokens); i++ {
250                 if !blockdigest.IsBlockLocator(tokens[i]) {
251                         break
252                 }
253         }
254         m.Blocks = tokens[:i]
255         fileTokens := tokens[i:]
256
257         if len(m.Blocks) == 0 {
258                 m.Err = fmt.Errorf("No block locators found")
259                 return
260         }
261
262         m.blockOffsets = make([]uint64, len(m.Blocks)+1)
263         var streamoffset uint64
264         for i, b := range m.Blocks {
265                 bl, err := ParseBlockLocator(b)
266                 if err != nil {
267                         m.Err = err
268                         return
269                 }
270                 m.blockOffsets[i] = streamoffset
271                 streamoffset += uint64(bl.Size)
272         }
273         m.blockOffsets[len(m.Blocks)] = streamoffset
274
275         if len(fileTokens) == 0 {
276                 m.Err = fmt.Errorf("No file tokens found")
277                 return
278         }
279
280         for _, ft := range fileTokens {
281                 pft, err := parseFileStreamSegment(ft)
282                 if err != nil {
283                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
284                         break
285                 }
286                 if pft.SegPos+pft.SegLen > streamoffset {
287                         m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
288                         break
289                 }
290                 m.FileStreamSegments = append(m.FileStreamSegments, pft)
291         }
292
293         return
294 }
295
296 func fixStreamName(sn string) string {
297         sn = path.Clean(sn)
298         if strings.HasPrefix(sn, "/") {
299                 sn = "." + sn
300         } else if sn != "." {
301                 sn = "./" + sn
302         }
303         return sn
304 }
305
306 func splitPath(srcpath string) (streamname, filename string) {
307         pathIdx := strings.LastIndex(srcpath, "/")
308         if pathIdx >= 0 {
309                 streamname = srcpath[0:pathIdx]
310                 filename = srcpath[pathIdx+1:]
311         } else {
312                 streamname = srcpath
313                 filename = ""
314         }
315         return
316 }
317
318 func (m *Manifest) segment() (*segmentedManifest, error) {
319         files := make(segmentedManifest)
320
321         for stream := range m.StreamIter() {
322                 if stream.Err != nil {
323                         // Stream has an error
324                         return nil, stream.Err
325                 }
326                 currentStreamfiles := make(map[string]bool)
327                 for _, f := range stream.FileStreamSegments {
328                         sn := stream.StreamName
329                         if strings.HasSuffix(sn, "/") {
330                                 sn = sn[0 : len(sn)-1]
331                         }
332                         path := sn + "/" + f.Name
333                         streamname, filename := splitPath(path)
334                         if files[streamname] == nil {
335                                 files[streamname] = make(segmentedStream)
336                         }
337                         if !currentStreamfiles[path] {
338                                 segs := files[streamname][filename]
339                                 for seg := range stream.FileSegmentIterByName(path) {
340                                         if seg.Len > 0 {
341                                                 segs = append(segs, *seg)
342                                         }
343                                 }
344                                 files[streamname][filename] = segs
345                                 currentStreamfiles[path] = true
346                         }
347                 }
348         }
349
350         return &files, nil
351 }
352
353 func (stream segmentedStream) normalizedText(name string) string {
354         var sortedfiles []string
355         for k := range stream {
356                 sortedfiles = append(sortedfiles, k)
357         }
358         sort.Strings(sortedfiles)
359
360         streamTokens := []string{EscapeName(name)}
361
362         blocks := make(map[blockdigest.BlockDigest]int64)
363         var streamoffset int64
364
365         // Go through each file and add each referenced block exactly once.
366         for _, streamfile := range sortedfiles {
367                 for _, segment := range stream[streamfile] {
368                         b, _ := ParseBlockLocator(segment.Locator)
369                         if _, ok := blocks[b.Digest]; !ok {
370                                 streamTokens = append(streamTokens, segment.Locator)
371                                 blocks[b.Digest] = streamoffset
372                                 streamoffset += int64(b.Size)
373                         }
374                 }
375         }
376
377         if len(streamTokens) == 1 {
378                 streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0")
379         }
380
381         for _, streamfile := range sortedfiles {
382                 // Add in file segments
383                 spanStart := int64(-1)
384                 spanEnd := int64(0)
385                 fout := EscapeName(streamfile)
386                 for _, segment := range stream[streamfile] {
387                         // Collapse adjacent segments
388                         b, _ := ParseBlockLocator(segment.Locator)
389                         streamoffset = blocks[b.Digest] + int64(segment.Offset)
390                         if spanStart == -1 {
391                                 spanStart = streamoffset
392                                 spanEnd = streamoffset + int64(segment.Len)
393                         } else {
394                                 if streamoffset == spanEnd {
395                                         spanEnd += int64(segment.Len)
396                                 } else {
397                                         streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
398                                         spanStart = streamoffset
399                                         spanEnd = streamoffset + int64(segment.Len)
400                                 }
401                         }
402                 }
403
404                 if spanStart != -1 {
405                         streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
406                 }
407
408                 if len(stream[streamfile]) == 0 {
409                         streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout))
410                 }
411         }
412
413         return strings.Join(streamTokens, " ") + "\n"
414 }
415
416 func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
417         srcpath = fixStreamName(srcpath)
418
419         var suffix string
420         if strings.HasSuffix(relocate, "/") {
421                 suffix = "/"
422         }
423         relocate = fixStreamName(relocate) + suffix
424
425         streamname, filename := splitPath(srcpath)
426
427         if stream, ok := m[streamname]; ok {
428                 // check if it refers to a single file in a stream
429                 filesegs, okfile := stream[filename]
430                 if okfile {
431                         newstream := make(segmentedStream)
432                         relocateStream, relocateFilename := splitPath(relocate)
433                         if relocateFilename == "" {
434                                 relocateFilename = filename
435                         }
436                         newstream[relocateFilename] = filesegs
437                         return newstream.normalizedText(relocateStream)
438                 }
439         }
440
441         // Going to extract multiple streams
442         prefix := srcpath + "/"
443
444         if strings.HasSuffix(relocate, "/") {
445                 relocate = relocate[0 : len(relocate)-1]
446         }
447
448         var sortedstreams []string
449         for k := range m {
450                 sortedstreams = append(sortedstreams, k)
451         }
452         sort.Strings(sortedstreams)
453
454         manifest := ""
455         for _, k := range sortedstreams {
456                 if strings.HasPrefix(k, prefix) || k == srcpath {
457                         manifest += m[k].normalizedText(relocate + k[len(srcpath):])
458                 }
459         }
460         return manifest
461 }
462
463 // Extract extracts some or all of the manifest and returns the extracted
464 // portion as a normalized manifest.  This is a swiss army knife function that
465 // can be several ways:
466 //
467 // If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
468 // in normalized form.
469 //
470 //   Extract(".", ".")  // return entire normalized manfest text
471 //
472 // If 'srcpath' points to a single file, it will return manifest text for just that file.
473 // The value of "relocate" is can be used to rename the file or set the file stream.
474 //
475 //   Extract("./foo", ".")          // extract file "foo" and put it in stream "."
476 //   Extract("./foo", "./bar")      // extract file "foo", rename it to "bar" in stream "."
477 //   Extract("./foo", "./bar/")     // extract file "foo", rename it to "./bar/foo"
478 //   Extract("./foo", "./bar/baz")  // extract file "foo", rename it to "./bar/baz")
479 //
480 // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
481 // them under the path in "relocate".
482 //
483 //   Extract("./stream", ".")      // extract "./stream" to "." and "./stream/subdir" to "./subdir")
484 //   Extract("./stream", "./bar")  // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
485 func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
486         segmented, err := m.segment()
487         if err != nil {
488                 ret.Err = err
489                 return
490         }
491         ret.Text = segmented.manifestTextForPath(srcpath, relocate)
492         return
493 }
494
495 func (m *Manifest) StreamIter() <-chan ManifestStream {
496         ch := make(chan ManifestStream)
497         go func(input string) {
498                 // This slice holds the current line and the remainder of the
499                 // manifest.  We parse one line at a time, to save effort if we
500                 // only need the first few lines.
501                 lines := []string{"", input}
502                 for {
503                         lines = strings.SplitN(lines[1], "\n", 2)
504                         if len(lines[0]) > 0 {
505                                 // Only parse non-blank lines
506                                 ch <- parseManifestStream(lines[0])
507                         }
508                         if len(lines) == 1 {
509                                 break
510                         }
511                 }
512                 close(ch)
513         }(m.Text)
514         return ch
515 }
516
517 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
518         ch := make(chan *FileSegment, 64)
519         filepath = fixStreamName(filepath)
520         go func() {
521                 for stream := range m.StreamIter() {
522                         if !strings.HasPrefix(filepath, stream.StreamName+"/") {
523                                 continue
524                         }
525                         stream.sendFileSegmentIterByName(filepath, ch)
526                 }
527                 close(ch)
528         }()
529         return ch
530 }
531
532 // BlockIterWithDuplicates iterates over the block locators of a manifest.
533 //
534 // Blocks may appear multiple times within the same manifest if they
535 // are used by multiple files. In that case this Iterator will output
536 // the same block multiple times.
537 //
538 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
539 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
540         blockChannel := make(chan blockdigest.BlockLocator)
541         go func(streamChannel <-chan ManifestStream) {
542                 for ms := range streamChannel {
543                         if ms.Err != nil {
544                                 m.Err = ms.Err
545                                 continue
546                         }
547                         for _, block := range ms.Blocks {
548                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
549                                         blockChannel <- b
550                                 } else {
551                                         m.Err = err
552                                 }
553                         }
554                 }
555                 close(blockChannel)
556         }(m.StreamIter())
557         return blockChannel
558 }