5824: Merge branch 'master' into 5824-keep-web
[arvados.git] / sdk / go / manifest / manifest.go
1 /* Deals with parsing Manifest Text. */
2
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
4
5 package manifest
6
7 import (
8         "errors"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
11         "log"
12         "regexp"
13         "strconv"
14         "strings"
15 )
16
17 var ErrInvalidToken = errors.New("Invalid token")
18
19 var LocatorPattern = regexp.MustCompile(
20         "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
21
22 type Manifest struct {
23         Text string
24 }
25
26 type BlockLocator struct {
27         Digest blockdigest.BlockDigest
28         Size   int
29         Hints  []string
30 }
31
32 type DataSegment struct {
33         BlockLocator
34         Locator      string
35         StreamOffset uint64
36 }
37
38 // FileSegment is a portion of a file that is contained within a
39 // single block.
40 type FileSegment struct {
41         Locator string
42         // Offset (within this block) of this data segment
43         Offset int
44         Len    int
45 }
46
47 // Represents a single line from a manifest.
48 type ManifestStream struct {
49         StreamName string
50         Blocks     []string
51         FileTokens []string
52 }
53
54 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
55
56 func unescapeSeq(seq string) string {
57         if seq == `\\` {
58                 return `\`
59         }
60         i, err := strconv.ParseUint(seq[1:], 8, 8)
61         if err != nil {
62                 // Invalid escape sequence: can't unescape.
63                 return seq
64         }
65         return string([]byte{byte(i)})
66 }
67
68 func UnescapeName(s string) string {
69         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
70 }
71
72 func ParseBlockLocator(s string) (b BlockLocator, err error) {
73         if !LocatorPattern.MatchString(s) {
74                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
75                         "\"%s\".",
76                         s,
77                         LocatorPattern.String())
78         } else {
79                 tokens := strings.Split(s, "+")
80                 var blockSize int64
81                 var blockDigest blockdigest.BlockDigest
82                 // We expect both of the following to succeed since LocatorPattern
83                 // restricts the strings appropriately.
84                 blockDigest, err = blockdigest.FromString(tokens[0])
85                 if err != nil {
86                         return
87                 }
88                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
89                 if err != nil {
90                         return
91                 }
92                 b.Digest = blockDigest
93                 b.Size = int(blockSize)
94                 b.Hints = tokens[2:]
95         }
96         return
97 }
98
99 func parseFileToken(tok string) (segPos, segLen uint64, name string, err error) {
100         parts := strings.SplitN(tok, ":", 3)
101         if len(parts) != 3 {
102                 err = ErrInvalidToken
103                 return
104         }
105         segPos, err = strconv.ParseUint(parts[0], 10, 64)
106         if err != nil {
107                 return
108         }
109         segLen, err = strconv.ParseUint(parts[1], 10, 64)
110         if err != nil {
111                 return
112         }
113         name = UnescapeName(parts[2])
114         return
115 }
116
117 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
118         ch := make(chan *FileSegment)
119         go func() {
120                 s.sendFileSegmentIterByName(filepath, ch)
121                 close(ch)
122         }()
123         return ch
124 }
125
126 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
127         blockLens := make([]int, 0, len(s.Blocks))
128         // This is what streamName+"/"+fileName will look like:
129         target := "./" + filepath
130         for _, fTok := range s.FileTokens {
131                 wantPos, wantLen, name, err := parseFileToken(fTok)
132                 if err != nil {
133                         // Skip (!) invalid file tokens.
134                         continue
135                 }
136                 if s.StreamName+"/"+name != target {
137                         continue
138                 }
139                 if wantLen == 0 {
140                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
141                         continue
142                 }
143                 // Linear search for blocks containing data for this
144                 // file
145                 var blockPos uint64 = 0 // position of block in stream
146                 for i, loc := range s.Blocks {
147                         if blockPos >= wantPos+wantLen {
148                                 break
149                         }
150                         if len(blockLens) <= i {
151                                 blockLens = blockLens[:i+1]
152                                 b, err := ParseBlockLocator(loc)
153                                 if err != nil {
154                                         // Unparseable locator -> unusable
155                                         // stream.
156                                         ch <- nil
157                                         return
158                                 }
159                                 blockLens[i] = b.Size
160                         }
161                         blockLen := uint64(blockLens[i])
162                         if blockPos+blockLen <= wantPos {
163                                 blockPos += blockLen
164                                 continue
165                         }
166                         fseg := FileSegment{
167                                 Locator: loc,
168                                 Offset:  0,
169                                 Len:     blockLens[i],
170                         }
171                         if blockPos < wantPos {
172                                 fseg.Offset = int(wantPos - blockPos)
173                                 fseg.Len -= fseg.Offset
174                         }
175                         if blockPos+blockLen > wantPos+wantLen {
176                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
177                         }
178                         ch <- &fseg
179                         blockPos += blockLen
180                 }
181         }
182 }
183
184 func parseManifestStream(s string) (m ManifestStream) {
185         tokens := strings.Split(s, " ")
186         m.StreamName = UnescapeName(tokens[0])
187         tokens = tokens[1:]
188         var i int
189         for i = range tokens {
190                 if !blockdigest.IsBlockLocator(tokens[i]) {
191                         break
192                 }
193         }
194         m.Blocks = tokens[:i]
195         m.FileTokens = tokens[i:]
196         return
197 }
198
199 func (m *Manifest) StreamIter() <-chan ManifestStream {
200         ch := make(chan ManifestStream)
201         go func(input string) {
202                 // This slice holds the current line and the remainder of the
203                 // manifest.  We parse one line at a time, to save effort if we
204                 // only need the first few lines.
205                 lines := []string{"", input}
206                 for {
207                         lines = strings.SplitN(lines[1], "\n", 2)
208                         if len(lines[0]) > 0 {
209                                 // Only parse non-blank lines
210                                 ch <- parseManifestStream(lines[0])
211                         }
212                         if len(lines) == 1 {
213                                 break
214                         }
215                 }
216                 close(ch)
217         }(m.Text)
218         return ch
219 }
220
221 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
222         ch := make(chan *FileSegment)
223         go func() {
224                 for stream := range m.StreamIter() {
225                         if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
226                                 continue
227                         }
228                         stream.sendFileSegmentIterByName(filepath, ch)
229                 }
230                 close(ch)
231         }()
232         return ch
233 }
234
235 // Blocks may appear mulitple times within the same manifest if they
236 // are used by multiple files. In that case this Iterator will output
237 // the same block multiple times.
238 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
239         blockChannel := make(chan blockdigest.BlockLocator)
240         go func(streamChannel <-chan ManifestStream) {
241                 for m := range streamChannel {
242                         for _, block := range m.Blocks {
243                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
244                                         blockChannel <- b
245                                 } else {
246                                         log.Printf("ERROR: Failed to parse block: %v", err)
247                                 }
248                         }
249                 }
250                 close(blockChannel)
251         }(m.StreamIter())
252         return blockChannel
253 }