9397: Add manifest normalization and sub-manifest extraction by path.
[arvados.git] / sdk / go / manifest / manifest.go
1 /* Deals with parsing Manifest Text. */
2
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
4
5 package manifest
6
7 import (
8         "errors"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
11         "regexp"
12         "sort"
13         "strconv"
14         "strings"
15 )
16
17 var ErrInvalidToken = errors.New("Invalid token")
18
19 type Manifest struct {
20         Text string
21         Err  error
22 }
23
24 type BlockLocator struct {
25         Digest blockdigest.BlockDigest
26         Size   int
27         Hints  []string
28 }
29
30 // FileSegment is a portion of a file that is contained within a
31 // single block.
32 type FileSegment struct {
33         Locator string
34         // Offset (within this block) of this data segment
35         Offset int
36         Len    int
37 }
38
39 // FileStreamSegment is a portion of a file described as a segment of a stream.
40 type FileStreamSegment struct {
41         SegPos uint64
42         SegLen uint64
43         Name   string
44 }
45
46 // Represents a single line from a manifest.
47 type ManifestStream struct {
48         StreamName         string
49         Blocks             []string
50         BlockOffsets       []uint64
51         FileStreamSegments []FileStreamSegment
52         Err                error
53 }
54
55 // Array of segments referencing file content
56 type SegmentedFile []FileSegment
57
58 // Map of files to list of file segments referencing file content
59 type SegmentedStream map[string]SegmentedFile
60
61 // Map of streams
62 type SegmentedManifest map[string]SegmentedStream
63
64 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
65
66 func unescapeSeq(seq string) string {
67         if seq == `\\` {
68                 return `\`
69         }
70         i, err := strconv.ParseUint(seq[1:], 8, 8)
71         if err != nil {
72                 // Invalid escape sequence: can't unescape.
73                 return seq
74         }
75         return string([]byte{byte(i)})
76 }
77
78 func EscapeName(s string) string {
79         return strings.Replace(s, " ", `\040`, -1)
80 }
81
82 func UnescapeName(s string) string {
83         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
84 }
85
86 func ParseBlockLocator(s string) (b BlockLocator, err error) {
87         if !blockdigest.LocatorPattern.MatchString(s) {
88                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
89                         "\"%s\".",
90                         s,
91                         blockdigest.LocatorPattern.String())
92         } else {
93                 tokens := strings.Split(s, "+")
94                 var blockSize int64
95                 var blockDigest blockdigest.BlockDigest
96                 // We expect both of the following to succeed since LocatorPattern
97                 // restricts the strings appropriately.
98                 blockDigest, err = blockdigest.FromString(tokens[0])
99                 if err != nil {
100                         return
101                 }
102                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
103                 if err != nil {
104                         return
105                 }
106                 b.Digest = blockDigest
107                 b.Size = int(blockSize)
108                 b.Hints = tokens[2:]
109         }
110         return
111 }
112
113 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
114         parts := strings.SplitN(tok, ":", 3)
115         if len(parts) != 3 {
116                 err = ErrInvalidToken
117                 return
118         }
119         ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
120         if err != nil {
121                 return
122         }
123         ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
124         if err != nil {
125                 return
126         }
127         ft.Name = UnescapeName(parts[2])
128         return
129 }
130
131 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
132         ch := make(chan *FileSegment, 64)
133         go func() {
134                 s.sendFileSegmentIterByName(filepath, ch)
135                 close(ch)
136         }()
137         return ch
138 }
139
140 func FirstBlock(offsets []uint64, range_start uint64) int {
141         // range_start/block_start is the inclusive lower bound
142         // range_end/block_end is the exclusive upper bound
143
144         hi := len(offsets) - 1
145         var lo int
146         i := ((hi + lo) / 2)
147         block_start := offsets[i]
148         block_end := offsets[i+1]
149
150         // perform a binary search for the first block
151         // assumes that all of the blocks are contiguous, so range_start is guaranteed
152         // to either fall into the range of a block or be outside the block range entirely
153         for !(range_start >= block_start && range_start < block_end) {
154                 if lo == i {
155                         // must be out of range, fail
156                         return -1
157                 }
158                 if range_start > block_start {
159                         lo = i
160                 } else {
161                         hi = i
162                         i = ((hi + lo) / 2)
163                         block_start = offsets[i]
164                         block_end = offsets[i+1]
165                 }
166         }
167         return i
168 }
169
170 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
171         // This is what streamName+"/"+fileName will look like:
172         target := filepath
173         if !strings.HasPrefix(target, "./") {
174                 target = "./" + target
175         }
176         for _, fTok := range s.FileStreamSegments {
177                 wantPos := fTok.SegPos
178                 wantLen := fTok.SegLen
179                 name := fTok.Name
180
181                 if s.StreamName+"/"+name != target {
182                         continue
183                 }
184                 if wantLen == 0 {
185                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
186                         continue
187                 }
188
189                 // Binary search to determine first block in the stream
190                 i := FirstBlock(s.BlockOffsets, wantPos)
191                 if i == -1 {
192                         // error
193                         break
194                 }
195                 for i < len(s.Blocks) {
196                         blockPos := s.BlockOffsets[i]
197                         blockEnd := s.BlockOffsets[i+1]
198                         if blockEnd <= wantPos {
199                                 // current block comes before current file span
200                                 // (shouldn't happen, FirstBlock() should start us
201                                 // on the right block)
202                                 break
203                         }
204                         if blockPos >= wantPos+wantLen {
205                                 // current block comes after current file span
206                                 break
207                         }
208
209                         fseg := FileSegment{
210                                 Locator: s.Blocks[i],
211                                 Offset:  0,
212                                 Len:     int(blockEnd - blockPos),
213                         }
214                         if blockPos < wantPos {
215                                 fseg.Offset = int(wantPos - blockPos)
216                                 fseg.Len -= fseg.Offset
217                         }
218                         if blockEnd > wantPos+wantLen {
219                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
220                         }
221                         ch <- &fseg
222                         i += 1
223                 }
224         }
225 }
226
227 func parseManifestStream(s string) (m ManifestStream) {
228         tokens := strings.Split(s, " ")
229
230         m.StreamName = UnescapeName(tokens[0])
231         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
232                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
233                 return
234         }
235
236         tokens = tokens[1:]
237         var i int
238         for i = 0; i < len(tokens); i++ {
239                 if !blockdigest.IsBlockLocator(tokens[i]) {
240                         break
241                 }
242         }
243         m.Blocks = tokens[:i]
244         fileTokens := tokens[i:]
245
246         if len(m.Blocks) == 0 {
247                 m.Err = fmt.Errorf("No block locators found")
248                 return
249         }
250
251         m.BlockOffsets = make([]uint64, len(m.Blocks)+1)
252         var streamoffset uint64
253         for i, b := range m.Blocks {
254                 bl, err := ParseBlockLocator(b)
255                 if err != nil {
256                         m.Err = err
257                         return
258                 }
259                 m.BlockOffsets[i] = streamoffset
260                 streamoffset += uint64(bl.Size)
261         }
262         m.BlockOffsets[len(m.Blocks)] = streamoffset
263
264         if len(fileTokens) == 0 {
265                 m.Err = fmt.Errorf("No file tokens found")
266                 return
267         }
268
269         for _, ft := range fileTokens {
270                 pft, err := parseFileStreamSegment(ft)
271                 if err != nil {
272                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
273                         break
274                 }
275                 m.FileStreamSegments = append(m.FileStreamSegments, pft)
276         }
277
278         return
279 }
280
281 func SplitPath(path string) (streamname, filename string) {
282         pathIdx := strings.LastIndex(path, "/")
283         if pathIdx >= 0 {
284                 streamname = path[0:pathIdx]
285                 filename = path[pathIdx+1:]
286         } else {
287                 streamname = path
288                 filename = ""
289         }
290         return
291 }
292
293 func (m *Manifest) SegmentManifest() *SegmentedManifest {
294         files := make(SegmentedManifest)
295
296         for stream := range m.StreamIter() {
297                 currentStreamfiles := make(map[string]bool)
298                 for _, f := range stream.FileStreamSegments {
299                         sn := stream.StreamName
300                         if sn != "." && !strings.HasPrefix(sn, "./") {
301                                 sn = "./" + sn
302                         }
303                         if strings.HasSuffix(sn, "/") {
304                                 sn = sn[0 : len(sn)-1]
305                         }
306                         path := sn + "/" + f.Name
307                         streamname, filename := SplitPath(path)
308                         if files[streamname] == nil {
309                                 files[streamname] = make(SegmentedStream)
310                         }
311                         if !currentStreamfiles[path] {
312                                 segs := files[streamname][filename]
313                                 for seg := range stream.FileSegmentIterByName(path) {
314                                         segs = append(segs, *seg)
315                                 }
316                                 files[streamname][filename] = segs
317                                 currentStreamfiles[path] = true
318                         }
319                 }
320         }
321
322         return &files
323 }
324
325 func (stream *SegmentedStream) NormalizeStream(name string) string {
326         var sortedfiles []string
327         for k, _ := range *stream {
328                 sortedfiles = append(sortedfiles, k)
329         }
330         sort.Strings(sortedfiles)
331
332         stream_tokens := []string{EscapeName(name)}
333
334         blocks := make(map[string]int64)
335         var streamoffset int64
336
337         // Go through each file and add each referenced block exactly once.
338         for _, streamfile := range sortedfiles {
339                 for _, segment := range (*stream)[streamfile] {
340                         if _, ok := blocks[segment.Locator]; !ok {
341                                 stream_tokens = append(stream_tokens, segment.Locator)
342                                 blocks[segment.Locator] = streamoffset
343                                 b, _ := ParseBlockLocator(segment.Locator)
344                                 streamoffset += int64(b.Size)
345                         }
346                 }
347         }
348
349         if len(stream_tokens) == 1 {
350                 stream_tokens = append(stream_tokens, "d41d8cd98f00b204e9800998ecf8427e+0")
351         }
352
353         for _, streamfile := range sortedfiles {
354                 // Add in file segments
355                 span_start := int64(-1)
356                 span_end := int64(0)
357                 fout := EscapeName(streamfile)
358                 for _, segment := range (*stream)[streamfile] {
359                         // Collapse adjacent segments
360                         streamoffset = blocks[segment.Locator] + int64(segment.Offset)
361                         if span_start == -1 {
362                                 span_start = streamoffset
363                                 span_end = streamoffset + int64(segment.Len)
364                         } else {
365                                 if streamoffset == span_end {
366                                         span_end += int64(segment.Len)
367                                 } else {
368                                         stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
369                                         span_start = streamoffset
370                                         span_end = streamoffset + int64(segment.Len)
371                                 }
372                         }
373                 }
374
375                 if span_start != -1 {
376                         stream_tokens = append(stream_tokens, fmt.Sprintf("%d:%d:%s", span_start, span_end-span_start, fout))
377                 }
378
379                 if len((*stream)[streamfile]) == 0 {
380                         stream_tokens = append(stream_tokens, fmt.Sprintf("0:0:%s", fout))
381                 }
382         }
383
384         return strings.Join(stream_tokens, " ") + "\n"
385 }
386
387 func (m *Manifest) NormalizeManifest() string {
388         segments := m.SegmentManifest()
389
390         var sortedstreams []string
391         for k, _ := range *segments {
392                 sortedstreams = append(sortedstreams, k)
393         }
394         sort.Strings(sortedstreams)
395
396         var manifest string
397         for _, k := range sortedstreams {
398                 stream := (*segments)[k]
399                 manifest += stream.NormalizeStream(k)
400         }
401         return manifest
402 }
403
404 func (m *SegmentedManifest) ManifestForPath(path, relocate string) string {
405         if path == "" {
406                 path = "."
407         }
408         if relocate == "" {
409                 relocate = "."
410         }
411
412         streamname, filename := SplitPath(path)
413         var relocate_stream, relocate_filename string
414         relocate_stream, relocate_filename = SplitPath(relocate)
415
416         if stream, ok := (*m)[path]; ok {
417                 // refers to a single stream
418                 return stream.NormalizeStream(relocate)
419         } else if stream, ok := (*m)[streamname]; ok {
420                 // refers to a single file in a stream
421                 newstream := make(SegmentedStream)
422                 if relocate_filename == "" {
423                         relocate_filename = filename
424                 }
425                 newstream[relocate_filename] = stream[filename]
426                 return newstream.NormalizeStream(relocate_stream)
427         } else {
428                 // refers to multiple streams
429                 manifest := ""
430                 prefix := path
431                 if !strings.HasSuffix(prefix, "/") {
432                         prefix += "/"
433                 }
434                 if !strings.HasSuffix(relocate, "/") {
435                         relocate += "/"
436                 }
437
438                 var sortedstreams []string
439                 for k, _ := range *m {
440                         sortedstreams = append(sortedstreams, k)
441                 }
442                 sort.Strings(sortedstreams)
443
444                 for _, k := range sortedstreams {
445                         if strings.HasPrefix(k, prefix) {
446                                 v := (*m)[k]
447                                 manifest += v.NormalizeStream(relocate + k[len(prefix):])
448                         }
449                 }
450                 return manifest
451         }
452 }
453
454 func (m *Manifest) ManifestForPath(path, relocate string) string {
455         return m.SegmentManifest().ManifestForPath(path, relocate)
456 }
457
458 func (m *Manifest) StreamIter() <-chan ManifestStream {
459         ch := make(chan ManifestStream)
460         go func(input string) {
461                 // This slice holds the current line and the remainder of the
462                 // manifest.  We parse one line at a time, to save effort if we
463                 // only need the first few lines.
464                 lines := []string{"", input}
465                 for {
466                         lines = strings.SplitN(lines[1], "\n", 2)
467                         if len(lines[0]) > 0 {
468                                 // Only parse non-blank lines
469                                 ch <- parseManifestStream(lines[0])
470                         }
471                         if len(lines) == 1 {
472                                 break
473                         }
474                 }
475                 close(ch)
476         }(m.Text)
477         return ch
478 }
479
480 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
481         ch := make(chan *FileSegment, 64)
482         if !strings.HasPrefix(filepath, "./") {
483                 filepath = "./" + filepath
484         }
485         go func() {
486                 for stream := range m.StreamIter() {
487                         if !strings.HasPrefix(filepath, stream.StreamName+"/") {
488                                 continue
489                         }
490                         stream.sendFileSegmentIterByName(filepath, ch)
491                 }
492                 close(ch)
493         }()
494         return ch
495 }
496
497 // Blocks may appear multiple times within the same manifest if they
498 // are used by multiple files. In that case this Iterator will output
499 // the same block multiple times.
500 //
501 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
502 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
503         blockChannel := make(chan blockdigest.BlockLocator)
504         go func(streamChannel <-chan ManifestStream) {
505                 for ms := range streamChannel {
506                         if ms.Err != nil {
507                                 m.Err = ms.Err
508                                 continue
509                         }
510                         for _, block := range ms.Blocks {
511                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
512                                         blockChannel <- b
513                                 } else {
514                                         m.Err = err
515                                 }
516                         }
517                 }
518                 close(blockChannel)
519         }(m.StreamIter())
520         return blockChannel
521 }