21891: Use bytes.Buffer to reduce string copies.
[arvados.git] / sdk / go / manifest / manifest.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 /* Deals with parsing Manifest Text. */
6
7 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
8
9 package manifest
10
11 import (
12         "bytes"
13         "errors"
14         "fmt"
15         "path"
16         "regexp"
17         "sort"
18         "strconv"
19         "strings"
20
21         "git.arvados.org/arvados.git/sdk/go/blockdigest"
22 )
23
24 var ErrInvalidToken = errors.New("Invalid token")
25
26 type Manifest struct {
27         Text string
28         Err  error
29 }
30
31 type BlockLocator struct {
32         Digest blockdigest.BlockDigest
33         Size   int
34         Hints  []string
35 }
36
37 // FileSegment is a portion of a file that is contained within a
38 // single block.
39 type FileSegment struct {
40         Locator string
41         // Offset (within this block) of this data segment
42         Offset int
43         Len    int
44 }
45
46 // FileStreamSegment is a portion of a file described as a segment of a stream.
47 type FileStreamSegment struct {
48         SegPos uint64
49         SegLen uint64
50         Name   string
51 }
52
53 // ManifestStream represents a single line from a manifest.
54 type ManifestStream struct {
55         StreamName         string
56         Blocks             []string
57         blockOffsets       []uint64
58         FileStreamSegments []FileStreamSegment
59         Err                error
60 }
61
62 // Array of segments referencing file content
63 type segmentedFile []FileSegment
64
65 // Map of files to list of file segments referencing file content
66 type segmentedStream map[string]segmentedFile
67
68 // Map of streams
69 type segmentedManifest map[string]segmentedStream
70
71 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
72
73 func unescapeSeq(seq string) string {
74         if seq == `\\` {
75                 return `\`
76         }
77         i, err := strconv.ParseUint(seq[1:], 8, 8)
78         if err != nil {
79                 // Invalid escape sequence: can't unescape.
80                 return seq
81         }
82         return string([]byte{byte(i)})
83 }
84
85 func EscapeName(s string) string {
86         raw := []byte(s)
87         escaped := make([]byte, 0, len(s))
88         for _, c := range raw {
89                 if c <= 32 {
90                         oct := fmt.Sprintf("\\%03o", c)
91                         escaped = append(escaped, []byte(oct)...)
92                 } else {
93                         escaped = append(escaped, c)
94                 }
95         }
96         return string(escaped)
97 }
98
99 func UnescapeName(s string) string {
100         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
101 }
102
103 func ParseBlockLocator(s string) (b BlockLocator, err error) {
104         if !blockdigest.LocatorPattern.MatchString(s) {
105                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
106                         "\"%s\".",
107                         s,
108                         blockdigest.LocatorPattern.String())
109         } else {
110                 tokens := strings.Split(s, "+")
111                 var blockSize int64
112                 var blockDigest blockdigest.BlockDigest
113                 // We expect both of the following to succeed since LocatorPattern
114                 // restricts the strings appropriately.
115                 blockDigest, err = blockdigest.FromString(tokens[0])
116                 if err != nil {
117                         return
118                 }
119                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
120                 if err != nil {
121                         return
122                 }
123                 b.Digest = blockDigest
124                 b.Size = int(blockSize)
125                 b.Hints = tokens[2:]
126         }
127         return
128 }
129
130 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
131         parts := strings.SplitN(tok, ":", 3)
132         if len(parts) != 3 {
133                 err = ErrInvalidToken
134                 return
135         }
136         ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
137         if err != nil {
138                 return
139         }
140         ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
141         if err != nil {
142                 return
143         }
144         ft.Name = UnescapeName(parts[2])
145         return
146 }
147
148 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
149         ch := make(chan *FileSegment, 64)
150         go func() {
151                 s.sendFileSegmentIterByName(filepath, ch)
152                 close(ch)
153         }()
154         return ch
155 }
156
157 func firstBlock(offsets []uint64, rangeStart uint64) int {
158         // rangeStart/blockStart is the inclusive lower bound
159         // rangeEnd/blockEnd is the exclusive upper bound
160
161         hi := len(offsets) - 1
162         var lo int
163         i := ((hi + lo) / 2)
164         blockStart := offsets[i]
165         blockEnd := offsets[i+1]
166
167         // perform a binary search for the first block
168         // assumes that all of the blocks are contiguous, so rangeStart is guaranteed
169         // to either fall into the range of a block or be outside the block range entirely
170         for !(rangeStart >= blockStart && rangeStart < blockEnd) {
171                 if lo == i {
172                         // must be out of range, fail
173                         return -1
174                 }
175                 if rangeStart > blockStart {
176                         lo = i
177                 } else {
178                         hi = i
179                 }
180                 i = ((hi + lo) / 2)
181                 blockStart = offsets[i]
182                 blockEnd = offsets[i+1]
183         }
184         return i
185 }
186
187 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
188         // This is what streamName+"/"+fileName will look like:
189         target := fixStreamName(filepath)
190         if !strings.HasPrefix(target, s.StreamName+"/") {
191                 return
192         }
193         targetTokName := target[len(s.StreamName)+1:]
194         for _, fTok := range s.FileStreamSegments {
195                 if fTok.Name != targetTokName {
196                         continue
197                 }
198                 wantPos := fTok.SegPos
199                 wantLen := fTok.SegLen
200                 if wantLen == 0 {
201                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
202                         continue
203                 }
204
205                 // Binary search to determine first block in the stream
206                 i := firstBlock(s.blockOffsets, wantPos)
207                 if i == -1 {
208                         // Shouldn't happen, file segments are checked in parseManifestStream
209                         panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
210                 }
211                 for ; i < len(s.Blocks); i++ {
212                         blockPos := s.blockOffsets[i]
213                         blockEnd := s.blockOffsets[i+1]
214                         if blockEnd <= wantPos {
215                                 // Shouldn't happen, FirstBlock() should start
216                                 // us on the right block, so if this triggers
217                                 // that means there is a bug.
218                                 panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
219                         }
220                         if blockPos >= wantPos+wantLen {
221                                 // current block comes after current file span
222                                 break
223                         }
224
225                         fseg := FileSegment{
226                                 Locator: s.Blocks[i],
227                                 Offset:  0,
228                                 Len:     int(blockEnd - blockPos),
229                         }
230                         if blockPos < wantPos {
231                                 fseg.Offset = int(wantPos - blockPos)
232                                 fseg.Len -= fseg.Offset
233                         }
234                         if blockEnd > wantPos+wantLen {
235                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
236                         }
237                         ch <- &fseg
238                 }
239         }
240 }
241
242 func parseManifestStream(s string) (m ManifestStream) {
243         tokens := strings.Split(s, " ")
244
245         m.StreamName = UnescapeName(tokens[0])
246         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
247                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
248                 return
249         }
250
251         tokens = tokens[1:]
252         var i int
253         for i = 0; i < len(tokens); i++ {
254                 if !blockdigest.IsBlockLocator(tokens[i]) {
255                         break
256                 }
257         }
258         m.Blocks = tokens[:i]
259         fileTokens := tokens[i:]
260
261         if len(m.Blocks) == 0 {
262                 m.Err = fmt.Errorf("No block locators found")
263                 return
264         }
265
266         m.blockOffsets = make([]uint64, len(m.Blocks)+1)
267         var streamoffset uint64
268         for i, b := range m.Blocks {
269                 bl, err := ParseBlockLocator(b)
270                 if err != nil {
271                         m.Err = err
272                         return
273                 }
274                 m.blockOffsets[i] = streamoffset
275                 streamoffset += uint64(bl.Size)
276         }
277         m.blockOffsets[len(m.Blocks)] = streamoffset
278
279         if len(fileTokens) == 0 {
280                 m.Err = fmt.Errorf("No file tokens found")
281                 return
282         }
283
284         for _, ft := range fileTokens {
285                 pft, err := parseFileStreamSegment(ft)
286                 if err != nil {
287                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
288                         break
289                 }
290                 if pft.SegPos+pft.SegLen > streamoffset {
291                         m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
292                         break
293                 }
294                 m.FileStreamSegments = append(m.FileStreamSegments, pft)
295         }
296
297         return
298 }
299
300 func fixStreamName(sn string) string {
301         sn = path.Clean(sn)
302         if strings.HasPrefix(sn, "/") {
303                 sn = "." + sn
304         } else if sn != "." {
305                 sn = "./" + sn
306         }
307         return sn
308 }
309
310 func splitPath(srcpath string) (streamname, filename string) {
311         pathIdx := strings.LastIndex(srcpath, "/")
312         if pathIdx >= 0 {
313                 streamname = srcpath[0:pathIdx]
314                 filename = srcpath[pathIdx+1:]
315         } else {
316                 streamname = srcpath
317                 filename = ""
318         }
319         return
320 }
321
322 func (m *Manifest) segment() (*segmentedManifest, error) {
323         files := make(segmentedManifest)
324
325         for stream := range m.StreamIter() {
326                 if stream.Err != nil {
327                         // Stream has an error
328                         return nil, stream.Err
329                 }
330                 currentStreamfiles := make(map[string]bool)
331                 for _, f := range stream.FileStreamSegments {
332                         sn := stream.StreamName
333                         if strings.HasSuffix(sn, "/") {
334                                 sn = sn[0 : len(sn)-1]
335                         }
336                         path := sn + "/" + f.Name
337                         streamname, filename := splitPath(path)
338                         if files[streamname] == nil {
339                                 files[streamname] = make(segmentedStream)
340                         }
341                         if !currentStreamfiles[path] {
342                                 segs := files[streamname][filename]
343                                 for seg := range stream.FileSegmentIterByName(path) {
344                                         if seg.Len > 0 {
345                                                 segs = append(segs, *seg)
346                                         }
347                                 }
348                                 files[streamname][filename] = segs
349                                 currentStreamfiles[path] = true
350                         }
351                 }
352         }
353
354         return &files, nil
355 }
356
357 func (stream segmentedStream) normalizedText(name string) string {
358         var sortedfiles []string
359         for k := range stream {
360                 sortedfiles = append(sortedfiles, k)
361         }
362         sort.Strings(sortedfiles)
363
364         streamTokens := []string{EscapeName(name)}
365
366         blocks := make(map[blockdigest.BlockDigest]int64)
367         var streamoffset int64
368
369         // Go through each file and add each referenced block exactly once.
370         for _, streamfile := range sortedfiles {
371                 for _, segment := range stream[streamfile] {
372                         b, _ := ParseBlockLocator(segment.Locator)
373                         if _, ok := blocks[b.Digest]; !ok {
374                                 streamTokens = append(streamTokens, segment.Locator)
375                                 blocks[b.Digest] = streamoffset
376                                 streamoffset += int64(b.Size)
377                         }
378                 }
379         }
380
381         if len(streamTokens) == 1 {
382                 streamTokens = append(streamTokens, "d41d8cd98f00b204e9800998ecf8427e+0")
383         }
384
385         for _, streamfile := range sortedfiles {
386                 // Add in file segments
387                 spanStart := int64(-1)
388                 spanEnd := int64(0)
389                 fout := EscapeName(streamfile)
390                 for _, segment := range stream[streamfile] {
391                         // Collapse adjacent segments
392                         b, _ := ParseBlockLocator(segment.Locator)
393                         streamoffset = blocks[b.Digest] + int64(segment.Offset)
394                         if spanStart == -1 {
395                                 spanStart = streamoffset
396                                 spanEnd = streamoffset + int64(segment.Len)
397                         } else {
398                                 if streamoffset == spanEnd {
399                                         spanEnd += int64(segment.Len)
400                                 } else {
401                                         streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
402                                         spanStart = streamoffset
403                                         spanEnd = streamoffset + int64(segment.Len)
404                                 }
405                         }
406                 }
407
408                 if spanStart != -1 {
409                         streamTokens = append(streamTokens, fmt.Sprintf("%d:%d:%s", spanStart, spanEnd-spanStart, fout))
410                 }
411
412                 if len(stream[streamfile]) == 0 {
413                         streamTokens = append(streamTokens, fmt.Sprintf("0:0:%s", fout))
414                 }
415         }
416
417         return strings.Join(streamTokens, " ") + "\n"
418 }
419
420 func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
421         srcpath = fixStreamName(srcpath)
422
423         var suffix string
424         if strings.HasSuffix(relocate, "/") {
425                 suffix = "/"
426         }
427         relocate = fixStreamName(relocate) + suffix
428
429         streamname, filename := splitPath(srcpath)
430
431         if stream, ok := m[streamname]; ok {
432                 // check if it refers to a single file in a stream
433                 filesegs, okfile := stream[filename]
434                 if okfile {
435                         newstream := make(segmentedStream)
436                         relocateStream, relocateFilename := splitPath(relocate)
437                         if relocateFilename == "" {
438                                 relocateFilename = filename
439                         }
440                         newstream[relocateFilename] = filesegs
441                         return newstream.normalizedText(relocateStream)
442                 }
443         }
444
445         // Going to extract multiple streams
446         prefix := srcpath + "/"
447
448         if strings.HasSuffix(relocate, "/") {
449                 relocate = relocate[0 : len(relocate)-1]
450         }
451
452         var sortedstreams []string
453         for k := range m {
454                 sortedstreams = append(sortedstreams, k)
455         }
456         sort.Strings(sortedstreams)
457
458         var manifest bytes.Buffer
459         for _, k := range sortedstreams {
460                 if strings.HasPrefix(k, prefix) || k == srcpath {
461                         manifest.WriteString(m[k].normalizedText(relocate + k[len(srcpath):]))
462                 }
463         }
464         return manifest.String()
465 }
466
467 // Extract extracts some or all of the manifest and returns the extracted
468 // portion as a normalized manifest.  This is a swiss army knife function that
469 // can be several ways:
470 //
471 // If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
472 // in normalized form.
473 //
474 //      Extract(".", ".")  // return entire normalized manfest text
475 //
476 // If 'srcpath' points to a single file, it will return manifest text for just that file.
477 // The value of "relocate" is can be used to rename the file or set the file stream.
478 //
479 //      Extract("./foo", ".")          // extract file "foo" and put it in stream "."
480 //      Extract("./foo", "./bar")      // extract file "foo", rename it to "bar" in stream "."
481 //      Extract("./foo", "./bar/")     // extract file "foo", rename it to "./bar/foo"
482 //      Extract("./foo", "./bar/baz")  // extract file "foo", rename it to "./bar/baz")
483 //
484 // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
485 // them under the path in "relocate".
486 //
487 //      Extract("./stream", ".")      // extract "./stream" to "." and "./stream/subdir" to "./subdir")
488 //      Extract("./stream", "./bar")  // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
489 func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
490         segmented, err := m.segment()
491         if err != nil {
492                 ret.Err = err
493                 return
494         }
495         ret.Text = segmented.manifestTextForPath(srcpath, relocate)
496         return
497 }
498
499 func (m *Manifest) StreamIter() <-chan ManifestStream {
500         ch := make(chan ManifestStream)
501         go func(input string) {
502                 // This slice holds the current line and the remainder of the
503                 // manifest.  We parse one line at a time, to save effort if we
504                 // only need the first few lines.
505                 lines := []string{"", input}
506                 for {
507                         lines = strings.SplitN(lines[1], "\n", 2)
508                         if len(lines[0]) > 0 {
509                                 // Only parse non-blank lines
510                                 ch <- parseManifestStream(lines[0])
511                         }
512                         if len(lines) == 1 {
513                                 break
514                         }
515                 }
516                 close(ch)
517         }(m.Text)
518         return ch
519 }
520
521 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
522         ch := make(chan *FileSegment, 64)
523         filepath = fixStreamName(filepath)
524         go func() {
525                 for stream := range m.StreamIter() {
526                         if !strings.HasPrefix(filepath, stream.StreamName+"/") {
527                                 continue
528                         }
529                         stream.sendFileSegmentIterByName(filepath, ch)
530                 }
531                 close(ch)
532         }()
533         return ch
534 }
535
536 // BlockIterWithDuplicates iterates over the block locators of a manifest.
537 //
538 // Blocks may appear multiple times within the same manifest if they
539 // are used by multiple files. In that case this Iterator will output
540 // the same block multiple times.
541 //
542 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
543 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
544         blockChannel := make(chan blockdigest.BlockLocator)
545         go func(streamChannel <-chan ManifestStream) {
546                 for ms := range streamChannel {
547                         if ms.Err != nil {
548                                 m.Err = ms.Err
549                                 continue
550                         }
551                         for _, block := range ms.Blocks {
552                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
553                                         blockChannel <- b
554                                 } else {
555                                         m.Err = err
556                                 }
557                         }
558                 }
559                 close(blockChannel)
560         }(m.StreamIter())
561         return blockChannel
562 }