a9745ae7cd7fb650909516f5e36451214654d792
[arvados.git] / sdk / go / manifest / manifest.go
1 /* Deals with parsing Manifest Text. */
2
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
4
5 package manifest
6
7 import (
8         "errors"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
11         "regexp"
12         "sort"
13         "strconv"
14         "strings"
15 )
16
17 var ErrInvalidToken = errors.New("Invalid token")
18
19 var LocatorPattern = regexp.MustCompile(
20         "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
21
22 type Manifest struct {
23         Text string
24         Err  error
25 }
26
27 type BlockLocator struct {
28         Digest blockdigest.BlockDigest
29         Size   int
30         Hints  []string
31 }
32
33 type DataSegment struct {
34         BlockLocator
35         Locator      string
36         StreamOffset uint64
37 }
38
39 // FileSegment is a portion of a file that is contained within a
40 // single block.
41 type FileSegment struct {
42         Locator string
43         // Offset (within this block) of this data segment
44         Offset int
45         Len    int
46 }
47
48 // FileStreamSegment is a portion of a file described as a segment of a stream.
49 type FileStreamSegment struct {
50         SegPos uint64
51         SegLen uint64
52         Name   string
53 }
54
55 // Represents a single line from a manifest.
56 type ManifestStream struct {
57         StreamName         string
58         Blocks             []string
59         FileStreamSegments []FileStreamSegment
60         Err                error
61 }
62
63 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
64
65 func unescapeSeq(seq string) string {
66         if seq == `\\` {
67                 return `\`
68         }
69         i, err := strconv.ParseUint(seq[1:], 8, 8)
70         if err != nil {
71                 // Invalid escape sequence: can't unescape.
72                 return seq
73         }
74         return string([]byte{byte(i)})
75 }
76
77 func UnescapeName(s string) string {
78         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
79 }
80
81 func ParseBlockLocator(s string) (b BlockLocator, err error) {
82         if !LocatorPattern.MatchString(s) {
83                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
84                         "\"%s\".",
85                         s,
86                         LocatorPattern.String())
87         } else {
88                 tokens := strings.Split(s, "+")
89                 var blockSize int64
90                 var blockDigest blockdigest.BlockDigest
91                 // We expect both of the following to succeed since LocatorPattern
92                 // restricts the strings appropriately.
93                 blockDigest, err = blockdigest.FromString(tokens[0])
94                 if err != nil {
95                         return
96                 }
97                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
98                 if err != nil {
99                         return
100                 }
101                 b.Digest = blockDigest
102                 b.Size = int(blockSize)
103                 b.Hints = tokens[2:]
104         }
105         return
106 }
107
108 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
109         parts := strings.SplitN(tok, ":", 3)
110         if len(parts) != 3 {
111                 err = ErrInvalidToken
112                 return
113         }
114         ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
115         if err != nil {
116                 return
117         }
118         ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
119         if err != nil {
120                 return
121         }
122         ft.Name = UnescapeName(parts[2])
123         return
124 }
125
126 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
127         ch := make(chan *FileSegment)
128         go func() {
129                 s.sendFileSegmentIterByName(filepath, ch)
130                 close(ch)
131         }()
132         return ch
133 }
134
135 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
136         blockLens := make([]int, 0, len(s.Blocks))
137         // This is what streamName+"/"+fileName will look like:
138         target := "./" + filepath
139         for _, fTok := range s.FileStreamSegments {
140                 wantPos := fTok.SegPos
141                 wantLen := fTok.SegLen
142                 name := fTok.Name
143
144                 if s.StreamName+"/"+name != target {
145                         continue
146                 }
147                 if wantLen == 0 {
148                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
149                         continue
150                 }
151                 // Linear search for blocks containing data for this
152                 // file
153                 var blockPos uint64 = 0 // position of block in stream
154                 for i, loc := range s.Blocks {
155                         if blockPos >= wantPos+wantLen {
156                                 break
157                         }
158                         if len(blockLens) <= i {
159                                 blockLens = blockLens[:i+1]
160                                 b, err := ParseBlockLocator(loc)
161                                 if err != nil {
162                                         // Unparseable locator -> unusable
163                                         // stream.
164                                         ch <- nil
165                                         return
166                                 }
167                                 blockLens[i] = b.Size
168                         }
169                         blockLen := uint64(blockLens[i])
170                         if blockPos+blockLen <= wantPos {
171                                 blockPos += blockLen
172                                 continue
173                         }
174                         fseg := FileSegment{
175                                 Locator: loc,
176                                 Offset:  0,
177                                 Len:     blockLens[i],
178                         }
179                         if blockPos < wantPos {
180                                 fseg.Offset = int(wantPos - blockPos)
181                                 fseg.Len -= fseg.Offset
182                         }
183                         if blockPos+blockLen > wantPos+wantLen {
184                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
185                         }
186                         ch <- &fseg
187                         blockPos += blockLen
188                 }
189         }
190 }
191
192 func parseManifestStream(s string) (m ManifestStream) {
193         tokens := strings.Split(s, " ")
194
195         m.StreamName = UnescapeName(tokens[0])
196         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
197                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
198                 return
199         }
200
201         tokens = tokens[1:]
202         var i int
203         for i = 0; i < len(tokens); i++ {
204                 if !blockdigest.IsBlockLocator(tokens[i]) {
205                         break
206                 }
207         }
208         m.Blocks = tokens[:i]
209         fileTokens := tokens[i:]
210
211         if len(m.Blocks) == 0 {
212                 m.Err = fmt.Errorf("No block locators found")
213                 return
214         }
215
216         if len(fileTokens) == 0 {
217                 m.Err = fmt.Errorf("No file tokens found")
218                 return
219         }
220
221         for _, ft := range fileTokens {
222                 pft, err := parseFileStreamSegment(ft)
223                 if err != nil {
224                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
225                         break
226                 }
227                 m.FileStreamSegments = append(m.FileStreamSegments, pft)
228         }
229
230         return
231 }
232
233 func (m *Manifest) NormalizeManifest() map[string]ManifestStream {
234         streams := make(map[string]ManifestStream)
235
236         for stream := range m.StreamIter() {
237                 ms := streams[stream.StreamName]
238
239                 if ms.StreamName == "" { // new stream
240                         streams[stream.StreamName] = stream
241                 } else {
242                         ms.Blocks = append(ms.Blocks, stream.Blocks...)
243                         ms.FileStreamSegments = append(ms.FileStreamSegments, stream.FileStreamSegments...)
244                 }
245         }
246
247         return streams
248 }
249
250 func (m *Manifest) NormalizedManifestForPath(path string) string {
251         normalized := m.NormalizeManifest()
252
253         var streams []string
254         for _, stream := range normalized {
255                 streams = append(streams, stream.StreamName)
256         }
257         sort.Strings(streams)
258
259         path = strings.Trim(path, "/")
260         var subdir, filename string
261
262         if path != "" {
263                 if strings.Index(path, "/") == -1 {
264                         isStream := false
265                         for _, v := range streams {
266                                 if v == "./"+path {
267                                         isStream = true
268                                 }
269                         }
270                         if isStream {
271                                 subdir = path
272                         } else {
273                                 filename = path
274                         }
275                 } else {
276                         pathIdx := strings.LastIndex(path, "/")
277                         if pathIdx >= 0 {
278                                 subdir = path[0:pathIdx]
279                                 filename = path[pathIdx+1:]
280                         }
281                 }
282         }
283
284         manifestForPath := ""
285
286         for _, streamName := range streams {
287                 stream := normalized[streamName]
288
289                 if subdir != "" && stream.StreamName != "./"+subdir {
290                         continue
291                 }
292
293                 manifestForPath += stream.StreamName + " " + strings.Join(stream.Blocks, " ") + " "
294
295                 currentName := ""
296                 currentSpan := []uint64{0, 0}
297                 for _, fss := range stream.FileStreamSegments {
298                         if filename != "" && fss.Name != filename {
299                                 continue
300                         }
301
302                         if fss.Name != currentName && currentName != "" {
303                                 manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + " "
304                         }
305
306                         if fss.Name != currentName {
307                                 currentName = fss.Name
308                                 currentSpan = []uint64{0, 0}
309                         }
310
311                         if currentSpan[1] == 0 {
312                                 currentSpan = []uint64{fss.SegPos, fss.SegLen}
313                         } else {
314                                 if currentSpan[1] == fss.SegPos {
315                                         currentSpan[1] += fss.SegLen
316                                 } else if currentSpan[0]+currentSpan[1] == fss.SegPos {
317                                         currentSpan[1] = fss.SegPos + fss.SegLen
318                                 } else {
319                                         manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]+fss.SegLen) + ":" + fss.Name + " "
320                                         currentSpan = []uint64{fss.SegPos, fss.SegPos + fss.SegLen}
321                                 }
322                         }
323                 }
324                 manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + "\n"
325         }
326
327         return manifestForPath
328 }
329
330 func (m *Manifest) StreamIter() <-chan ManifestStream {
331         ch := make(chan ManifestStream)
332         go func(input string) {
333                 // This slice holds the current line and the remainder of the
334                 // manifest.  We parse one line at a time, to save effort if we
335                 // only need the first few lines.
336                 lines := []string{"", input}
337                 for {
338                         lines = strings.SplitN(lines[1], "\n", 2)
339                         if len(lines[0]) > 0 {
340                                 // Only parse non-blank lines
341                                 ch <- parseManifestStream(lines[0])
342                         }
343                         if len(lines) == 1 {
344                                 break
345                         }
346                 }
347                 close(ch)
348         }(m.Text)
349         return ch
350 }
351
352 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
353         ch := make(chan *FileSegment)
354         go func() {
355                 for stream := range m.StreamIter() {
356                         if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
357                                 continue
358                         }
359                         stream.sendFileSegmentIterByName(filepath, ch)
360                 }
361                 close(ch)
362         }()
363         return ch
364 }
365
366 // Blocks may appear multiple times within the same manifest if they
367 // are used by multiple files. In that case this Iterator will output
368 // the same block multiple times.
369 //
370 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
371 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
372         blockChannel := make(chan blockdigest.BlockLocator)
373         go func(streamChannel <-chan ManifestStream) {
374                 for ms := range streamChannel {
375                         if ms.Err != nil {
376                                 m.Err = ms.Err
377                                 continue
378                         }
379                         for _, block := range ms.Blocks {
380                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
381                                         blockChannel <- b
382                                 } else {
383                                         m.Err = err
384                                 }
385                         }
386                 }
387                 close(blockChannel)
388         }(m.StreamIter())
389         return blockChannel
390 }