8784: Fix test for latest firefox.
[arvados.git] / sdk / go / manifest / manifest.go
1 /* Deals with parsing Manifest Text. */
2
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
4
5 package manifest
6
7 import (
8         "errors"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
11         "path"
12         "regexp"
13         "sort"
14         "strconv"
15         "strings"
16 )
17
18 var ErrInvalidToken = errors.New("Invalid token")
19
20 type Manifest struct {
21         Text string
22         Err  error
23 }
24
25 type BlockLocator struct {
26         Digest blockdigest.BlockDigest
27         Size   int
28         Hints  []string
29 }
30
31 // FileSegment is a portion of a file that is contained within a
32 // single block.
33 type FileSegment struct {
34         Locator string
35         // Offset (within this block) of this data segment
36         Offset int
37         Len    int
38 }
39
40 // FileStreamSegment is a portion of a file described as a segment of a stream.
41 type FileStreamSegment struct {
42         SegPos uint64
43         SegLen uint64
44         Name   string
45 }
46
47 // Represents a single line from a manifest.
48 type ManifestStream struct {
49         StreamName         string
50         Blocks             []string
51         blockOffsets       []uint64
52         FileStreamSegments []FileStreamSegment
53         Err                error
54 }
55
56 // Array of segments referencing file content
57 type segmentedFile []FileSegment
58
59 // Map of files to list of file segments referencing file content
60 type segmentedStream map[string]segmentedFile
61
62 // Map of streams
63 type segmentedManifest map[string]segmentedStream
64
65 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
66
67 func unescapeSeq(seq string) string {
68         if seq == `\\` {
69                 return `\`
70         }
71         i, err := strconv.ParseUint(seq[1:], 8, 8)
72         if err != nil {
73                 // Invalid escape sequence: can't unescape.
74                 return seq
75         }
76         return string([]byte{byte(i)})
77 }
78
79 func EscapeName(s string) string {
80         raw := []byte(s)
81         escaped := make([]byte, 0, len(s))
82         for _, c := range raw {
83                 if c <= 32 {
84                         oct := fmt.Sprintf("\\%03o", c)
85                         escaped = append(escaped, []byte(oct)...)
86                 } else {
87                         escaped = append(escaped, c)
88                 }
89         }
90         return string(escaped)
91 }
92
93 func UnescapeName(s string) string {
94         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
95 }
96
97 func ParseBlockLocator(s string) (b BlockLocator, err error) {
98         if !blockdigest.LocatorPattern.MatchString(s) {
99                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
100                         "\"%s\".",
101                         s,
102                         blockdigest.LocatorPattern.String())
103         } else {
104                 tokens := strings.Split(s, "+")
105                 var blockSize int64
106                 var blockDigest blockdigest.BlockDigest
107                 // We expect both of the following to succeed since LocatorPattern
108                 // restricts the strings appropriately.
109                 blockDigest, err = blockdigest.FromString(tokens[0])
110                 if err != nil {
111                         return
112                 }
113                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
114                 if err != nil {
115                         return
116                 }
117                 b.Digest = blockDigest
118                 b.Size = int(blockSize)
119                 b.Hints = tokens[2:]
120         }
121         return
122 }
123
124 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
125         parts := strings.SplitN(tok, ":", 3)
126         if len(parts) != 3 {
127                 err = ErrInvalidToken
128                 return
129         }
130         ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
131         if err != nil {
132                 return
133         }
134         ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
135         if err != nil {
136                 return
137         }
138         ft.Name = UnescapeName(parts[2])
139         return
140 }
141
142 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
143         ch := make(chan *FileSegment, 64)
144         go func() {
145                 s.sendFileSegmentIterByName(filepath, ch)
146                 close(ch)
147         }()
148         return ch
149 }
150
151 func firstBlock(offsets []uint64, range_start uint64) int {
152         // range_start/block_start is the inclusive lower bound
153         // range_end/block_end is the exclusive upper bound
154
155         hi := len(offsets) - 1
156         var lo int
157         i := ((hi + lo) / 2)
158         block_start := offsets[i]
159         block_end := offsets[i+1]
160
161         // perform a binary search for the first block
162         // assumes that all of the blocks are contiguous, so range_start is guaranteed
163         // to either fall into the range of a block or be outside the block range entirely
164         for !(range_start >= block_start && range_start < block_end) {
165                 if lo == i {
166                         // must be out of range, fail
167                         return -1
168                 }
169                 if range_start > block_start {
170                         lo = i
171                 } else {
172                         hi = i
173                 }
174                 i = ((hi + lo) / 2)
175                 block_start = offsets[i]
176                 block_end = offsets[i+1]
177         }
178         return i
179 }
180
181 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
182         // This is what streamName+"/"+fileName will look like:
183         target := fixStreamName(filepath)
184         for _, fTok := range s.FileStreamSegments {
185                 wantPos := fTok.SegPos
186                 wantLen := fTok.SegLen
187                 name := fTok.Name
188
189                 if s.StreamName+"/"+name != target {
190                         continue
191                 }
192                 if wantLen == 0 {
193                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
194                         continue
195                 }
196
197                 // Binary search to determine first block in the stream
198                 i := firstBlock(s.blockOffsets, wantPos)
199                 if i == -1 {
200                         // Shouldn't happen, file segments are checked in parseManifestStream
201                         panic(fmt.Sprintf("File segment %v extends past end of stream", fTok))
202                 }
203                 for ; i < len(s.Blocks); i++ {
204                         blockPos := s.blockOffsets[i]
205                         blockEnd := s.blockOffsets[i+1]
206                         if blockEnd <= wantPos {
207                                 // Shouldn't happen, FirstBlock() should start
208                                 // us on the right block, so if this triggers
209                                 // that means there is a bug.
210                                 panic(fmt.Sprintf("Block end %v comes before start of file segment %v", blockEnd, wantPos))
211                         }
212                         if blockPos >= wantPos+wantLen {
213                                 // current block comes after current file span
214                                 break
215                         }
216
217                         fseg := FileSegment{
218                                 Locator: s.Blocks[i],
219                                 Offset:  0,
220                                 Len:     int(blockEnd - blockPos),
221                         }
222                         if blockPos < wantPos {
223                                 fseg.Offset = int(wantPos - blockPos)
224                                 fseg.Len -= fseg.Offset
225                         }
226                         if blockEnd > wantPos+wantLen {
227                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
228                         }
229                         ch <- &fseg
230                 }
231         }
232 }
233
234 func parseManifestStream(s string) (m ManifestStream) {
235         tokens := strings.Split(s, " ")
236
237         m.StreamName = UnescapeName(tokens[0])
238         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
239                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
240                 return
241         }
242
243         tokens = tokens[1:]
244         var i int
245         for i = 0; i < len(tokens); i++ {
246                 if !blockdigest.IsBlockLocator(tokens[i]) {
247                         break
248                 }
249         }
250         m.Blocks = tokens[:i]
251         fileTokens := tokens[i:]
252
253         if len(m.Blocks) == 0 {
254                 m.Err = fmt.Errorf("No block locators found")
255                 return
256         }
257
258         m.blockOffsets = make([]uint64, len(m.Blocks)+1)
259         var streamoffset uint64
260         for i, b := range m.Blocks {
261                 bl, err := ParseBlockLocator(b)
262                 if err != nil {
263                         m.Err = err
264                         return
265                 }
266                 m.blockOffsets[i] = streamoffset
267                 streamoffset += uint64(bl.Size)
268         }
269         m.blockOffsets[len(m.Blocks)] = streamoffset
270
271         if len(fileTokens) == 0 {
272                 m.Err = fmt.Errorf("No file tokens found")
273                 return
274         }
275
276         for _, ft := range fileTokens {
277                 pft, err := parseFileStreamSegment(ft)
278                 if err != nil {
279                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
280                         break
281                 }
282                 if pft.SegPos+pft.SegLen > streamoffset {
283                         m.Err = fmt.Errorf("File segment %s extends past end of stream %d", ft, streamoffset)
284                         break
285                 }
286                 m.FileStreamSegments = append(m.FileStreamSegments, pft)
287         }
288
289         return
290 }
291
292 func fixStreamName(sn string) string {
293         sn = path.Clean(sn)
294         if strings.HasPrefix(sn, "/") {
295                 sn = "." + sn
296         } else if sn != "." {
297                 sn = "./" + sn
298         }
299         return sn
300 }
301
302 func splitPath(srcpath string) (streamname, filename string) {
303         pathIdx := strings.LastIndex(srcpath, "/")
304         if pathIdx >= 0 {
305                 streamname = srcpath[0:pathIdx]
306                 filename = srcpath[pathIdx+1:]
307         } else {
308                 streamname = srcpath
309                 filename = ""
310         }
311         return
312 }
313
314 func (m *Manifest) segment() (*segmentedManifest, error) {
315         files := make(segmentedManifest)
316
317         for stream := range m.StreamIter() {
318                 if stream.Err != nil {
319                         // Stream has an error
320                         return nil, stream.Err
321                 }
322                 currentStreamfiles := make(map[string]bool)
323                 for _, f := range stream.FileStreamSegments {
324                         sn := stream.StreamName
325                         if strings.HasSuffix(sn, "/") {
326                                 sn = sn[0 : len(sn)-1]
327                         }
328                         path := sn + "/" + f.Name
329                         streamname, filename := splitPath(path)
330                         if files[streamname] == nil {
331                                 files[streamname] = make(segmentedStream)
332                         }
333                         if !currentStreamfiles[path] {
334                                 segs := files[streamname][filename]
335                                 for seg := range stream.FileSegmentIterByName(path) {
336                                         if seg.Len > 0 {
337                                                 segs = append(segs, *seg)
338                                         }
339                                 }
340                                 files[streamname][filename] = segs
341                                 currentStreamfiles[path] = true
342                         }
343                 }
344         }
345
346         return &files, nil
347 }
348
349 func (stream segmentedStream) normalizedText(name string) string {
350         var sortedfiles []string
351         for k, _ := range stream {
352                 sortedfiles = append(sortedfiles, k)
353         }
354         sort.Strings(sortedfiles)
355
356         stream_tokens := []string{EscapeName(name)}
357
358         blocks := make(map[blockdigest.BlockDigest]int64)
359         var streamoffset int64
360
361         // Go through each file and add each referenced block exactly once.
362         for _, streamfile := range sortedfiles {
363                 for _, segment := range stream[streamfile] {
364                         b, _ := ParseBlockLocator(segment.Locator)
365                         if _, ok := blocks[b.Digest]; !ok {
366                                 stream_tokens = append(stream_tokens, segment.Locator)
367                                 blocks[b.Digest] = streamoffset
368                                 streamoffset += int64(b.Size)
369                         }
370                 }
371         }
372
373         if len(stream_tokens) == 1 {
374                 stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
375         }
376
377         for _, streamfile := range sortedfiles {
378                 // Add in file segments
379                 span_start := int64(-1)
380                 span_end := int64(0)
381                 fout := EscapeName(streamfile)
382                 for _, segment := range stream[streamfile] {
383                         // Collapse adjacent segments
384                         b, _ := ParseBlockLocator(segment.Locator)
385                         streamoffset = blocks[b.Digest] + int64(segment.Offset)
386                         if span_start == -1 {
387                                 span_start = streamoffset
388                                 span_end = streamoffset + int64(segment.Len)
389                         } else {
390                                 if streamoffset == span_end {
391                                         span_end += int64(segment.Len)
392                                 } else {
393                                         stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
394                                         span_start = streamoffset
395                                         span_end = streamoffset + int64(segment.Len)
396                                 }
397                         }
398                 }
399
400                 if span_start != -1 {
401                         stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
402                 }
403
404                 if len(stream[streamfile]) == 0 {
405                         stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
406                 }
407         }
408
409         return strings.Join(stream_tokens, " ") + "\n"
410 }
411
412 func (m segmentedManifest) manifestTextForPath(srcpath, relocate string) string {
413         srcpath = fixStreamName(srcpath)
414
415         var suffix string
416         if strings.HasSuffix(relocate, "/") {
417                 suffix = "/"
418         }
419         relocate = fixStreamName(relocate) + suffix
420
421         streamname, filename := splitPath(srcpath)
422
423         if stream, ok := m[streamname]; ok {
424                 // check if it refers to a single file in a stream
425                 filesegs, okfile := stream[filename]
426                 if okfile {
427                         newstream := make(segmentedStream)
428                         relocate_stream, relocate_filename := splitPath(relocate)
429                         if relocate_filename == "" {
430                                 relocate_filename = filename
431                         }
432                         newstream[relocate_filename] = filesegs
433                         return newstream.normalizedText(relocate_stream)
434                 }
435         }
436
437         // Going to extract multiple streams
438         prefix := srcpath + "/"
439
440         if strings.HasSuffix(relocate, "/") {
441                 relocate = relocate[0 : len(relocate)-1]
442         }
443
444         var sortedstreams []string
445         for k, _ := range m {
446                 sortedstreams = append(sortedstreams, k)
447         }
448         sort.Strings(sortedstreams)
449
450         manifest := ""
451         for _, k := range sortedstreams {
452                 if strings.HasPrefix(k, prefix) || k == srcpath {
453                         manifest += m[k].normalizedText(relocate + k[len(srcpath):])
454                 }
455         }
456         return manifest
457 }
458
459 // Extract extracts some or all of the manifest and returns the extracted
460 // portion as a normalized manifest.  This is a swiss army knife function that
461 // can be several ways:
462 //
463 // If 'srcpath' and 'relocate' are '.' it simply returns an equivalent manifest
464 // in normalized form.
465 //
466 //   Extract(".", ".")  // return entire normalized manfest text
467 //
468 // If 'srcpath' points to a single file, it will return manifest text for just that file.
469 // The value of "relocate" is can be used to rename the file or set the file stream.
470 //
471 //   Extract("./foo", ".")          // extract file "foo" and put it in stream "."
472 //   Extract("./foo", "./bar")      // extract file "foo", rename it to "bar" in stream "."
473 //   Extract("./foo", "./bar/")     // extract file "foo", rename it to "./bar/foo"
474 //   Extract("./foo", "./bar/baz")  // extract file "foo", rename it to "./bar/baz")
475 //
476 // Otherwise it will return the manifest text for all streams with the prefix in "srcpath" and place
477 // them under the path in "relocate".
478 //
479 //   Extract("./stream", ".")      // extract "./stream" to "." and "./stream/subdir" to "./subdir")
480 //   Extract("./stream", "./bar")  // extract "./stream" to "./bar" and "./stream/subdir" to "./bar/subdir")
481 func (m Manifest) Extract(srcpath, relocate string) (ret Manifest) {
482         segmented, err := m.segment()
483         if err != nil {
484                 ret.Err = err
485                 return
486         }
487         ret.Text = segmented.manifestTextForPath(srcpath, relocate)
488         return
489 }
490
491 func (m *Manifest) StreamIter() <-chan ManifestStream {
492         ch := make(chan ManifestStream)
493         go func(input string) {
494                 // This slice holds the current line and the remainder of the
495                 // manifest.  We parse one line at a time, to save effort if we
496                 // only need the first few lines.
497                 lines := []string{"", input}
498                 for {
499                         lines = strings.SplitN(lines[1], "\n", 2)
500                         if len(lines[0]) > 0 {
501                                 // Only parse non-blank lines
502                                 ch <- parseManifestStream(lines[0])
503                         }
504                         if len(lines) == 1 {
505                                 break
506                         }
507                 }
508                 close(ch)
509         }(m.Text)
510         return ch
511 }
512
513 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
514         ch := make(chan *FileSegment, 64)
515         filepath = fixStreamName(filepath)
516         go func() {
517                 for stream := range m.StreamIter() {
518                         if !strings.HasPrefix(filepath, stream.StreamName+"/") {
519                                 continue
520                         }
521                         stream.sendFileSegmentIterByName(filepath, ch)
522                 }
523                 close(ch)
524         }()
525         return ch
526 }
527
528 // Blocks may appear multiple times within the same manifest if they
529 // are used by multiple files. In that case this Iterator will output
530 // the same block multiple times.
531 //
532 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
533 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
534         blockChannel := make(chan blockdigest.BlockLocator)
535         go func(streamChannel <-chan ManifestStream) {
536                 for ms := range streamChannel {
537                         if ms.Err != nil {
538                                 m.Err = ms.Err
539                                 continue
540                         }
541                         for _, block := range ms.Blocks {
542                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
543                                         blockChannel <- b
544                                 } else {
545                                         m.Err = err
546                                 }
547                         }
548                 }
549                 close(blockChannel)
550         }(m.StreamIter())
551         return blockChannel
552 }