Merge branch 'master' into 7252-go-sdk-errors
[arvados.git] / sdk / go / manifest / manifest.go
1 /* Deals with parsing Manifest Text. */
2
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
4
5 package manifest
6
7 import (
8         "errors"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
11         "regexp"
12         "strconv"
13         "strings"
14 )
15
16 var ErrInvalidToken = errors.New("Invalid token")
17
18 var LocatorPattern = regexp.MustCompile(
19         "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
20
21 type Manifest struct {
22         Text string
23         Err  error
24 }
25
26 type BlockLocator struct {
27         Digest blockdigest.BlockDigest
28         Size   int
29         Hints  []string
30 }
31
32 type DataSegment struct {
33         BlockLocator
34         Locator      string
35         StreamOffset uint64
36 }
37
38 // FileSegment is a portion of a file that is contained within a
39 // single block.
40 type FileSegment struct {
41         Locator string
42         // Offset (within this block) of this data segment
43         Offset int
44         Len    int
45 }
46
47 // Represents a single line from a manifest.
48 type ManifestStream struct {
49         StreamName string
50         Blocks     []string
51         FileTokens []string
52         Err        error
53 }
54
55 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
56
57 func unescapeSeq(seq string) string {
58         if seq == `\\` {
59                 return `\`
60         }
61         i, err := strconv.ParseUint(seq[1:], 8, 8)
62         if err != nil {
63                 // Invalid escape sequence: can't unescape.
64                 return seq
65         }
66         return string([]byte{byte(i)})
67 }
68
69 func UnescapeName(s string) string {
70         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
71 }
72
73 func ParseBlockLocator(s string) (b BlockLocator, err error) {
74         if !LocatorPattern.MatchString(s) {
75                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
76                         "\"%s\".",
77                         s,
78                         LocatorPattern.String())
79         } else {
80                 tokens := strings.Split(s, "+")
81                 var blockSize int64
82                 var blockDigest blockdigest.BlockDigest
83                 // We expect both of the following to succeed since LocatorPattern
84                 // restricts the strings appropriately.
85                 blockDigest, err = blockdigest.FromString(tokens[0])
86                 if err != nil {
87                         return
88                 }
89                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
90                 if err != nil {
91                         return
92                 }
93                 b.Digest = blockDigest
94                 b.Size = int(blockSize)
95                 b.Hints = tokens[2:]
96         }
97         return
98 }
99
100 func parseFileToken(tok string) (segPos, segLen uint64, name string, err error) {
101         parts := strings.SplitN(tok, ":", 3)
102         if len(parts) != 3 {
103                 err = ErrInvalidToken
104                 return
105         }
106         segPos, err = strconv.ParseUint(parts[0], 10, 64)
107         if err != nil {
108                 return
109         }
110         segLen, err = strconv.ParseUint(parts[1], 10, 64)
111         if err != nil {
112                 return
113         }
114         name = UnescapeName(parts[2])
115         return
116 }
117
118 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
119         ch := make(chan *FileSegment)
120         go func() {
121                 s.sendFileSegmentIterByName(filepath, ch)
122                 close(ch)
123         }()
124         return ch
125 }
126
127 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
128         blockLens := make([]int, 0, len(s.Blocks))
129         // This is what streamName+"/"+fileName will look like:
130         target := "./" + filepath
131         for _, fTok := range s.FileTokens {
132                 wantPos, wantLen, name, err := parseFileToken(fTok)
133                 if err != nil {
134                         // Skip (!) invalid file tokens.
135                         continue
136                 }
137                 if s.StreamName+"/"+name != target {
138                         continue
139                 }
140                 if wantLen == 0 {
141                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
142                         continue
143                 }
144                 // Linear search for blocks containing data for this
145                 // file
146                 var blockPos uint64 = 0 // position of block in stream
147                 for i, loc := range s.Blocks {
148                         if blockPos >= wantPos+wantLen {
149                                 break
150                         }
151                         if len(blockLens) <= i {
152                                 blockLens = blockLens[:i+1]
153                                 b, err := ParseBlockLocator(loc)
154                                 if err != nil {
155                                         // Unparseable locator -> unusable
156                                         // stream.
157                                         ch <- nil
158                                         return
159                                 }
160                                 blockLens[i] = b.Size
161                         }
162                         blockLen := uint64(blockLens[i])
163                         if blockPos+blockLen <= wantPos {
164                                 blockPos += blockLen
165                                 continue
166                         }
167                         fseg := FileSegment{
168                                 Locator: loc,
169                                 Offset:  0,
170                                 Len:     blockLens[i],
171                         }
172                         if blockPos < wantPos {
173                                 fseg.Offset = int(wantPos - blockPos)
174                                 fseg.Len -= fseg.Offset
175                         }
176                         if blockPos+blockLen > wantPos+wantLen {
177                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
178                         }
179                         ch <- &fseg
180                         blockPos += blockLen
181                 }
182         }
183 }
184
185 func parseManifestStream(s string) (m ManifestStream) {
186         tokens := strings.Split(s, " ")
187
188         m.StreamName = UnescapeName(tokens[0])
189         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
190                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
191                 return
192         }
193
194         tokens = tokens[1:]
195         var i int
196         for i = 0; i < len(tokens); i++ {
197                 if !blockdigest.IsBlockLocator(tokens[i]) {
198                         break
199                 }
200         }
201         m.Blocks = tokens[:i]
202         m.FileTokens = tokens[i:]
203
204         if len(m.Blocks) == 0 {
205                 m.Err = fmt.Errorf("No block locators found")
206                 return
207         }
208
209         if len(m.FileTokens) == 0 {
210                 m.Err = fmt.Errorf("No file tokens found")
211                 return
212         }
213
214         for _, ft := range m.FileTokens {
215                 _, _, _, err := parseFileToken(ft)
216                 if err != nil {
217                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
218                         break
219                 }
220         }
221
222         return
223 }
224
225 func (m *Manifest) StreamIter() <-chan ManifestStream {
226         ch := make(chan ManifestStream)
227         go func(input string) {
228                 // This slice holds the current line and the remainder of the
229                 // manifest.  We parse one line at a time, to save effort if we
230                 // only need the first few lines.
231                 lines := []string{"", input}
232                 for {
233                         lines = strings.SplitN(lines[1], "\n", 2)
234                         if len(lines[0]) > 0 {
235                                 // Only parse non-blank lines
236                                 ch <- parseManifestStream(lines[0])
237                         }
238                         if len(lines) == 1 {
239                                 break
240                         }
241                 }
242                 close(ch)
243         }(m.Text)
244         return ch
245 }
246
247 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
248         ch := make(chan *FileSegment)
249         go func() {
250                 for stream := range m.StreamIter() {
251                         if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
252                                 continue
253                         }
254                         stream.sendFileSegmentIterByName(filepath, ch)
255                 }
256                 close(ch)
257         }()
258         return ch
259 }
260
261 // Blocks may appear mulitple times within the same manifest if they
262 // are used by multiple files. In that case this Iterator will output
263 // the same block multiple times.
264 //
265 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
266 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
267         blockChannel := make(chan blockdigest.BlockLocator)
268         go func(streamChannel <-chan ManifestStream) {
269                 for ms := range streamChannel {
270                         if ms.Err != nil {
271                                 m.Err = ms.Err
272                                 continue
273                         }
274                         for _, block := range ms.Blocks {
275                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
276                                         blockChannel <- b
277                                 } else {
278                                         m.Err = err
279                                 }
280                         }
281                 }
282                 close(blockChannel)
283         }(m.StreamIter())
284         return blockChannel
285 }