10945: "Used in jobs" panel placement
[arvados.git] / sdk / go / manifest / manifest.go
1 /* Deals with parsing Manifest Text. */
2
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
4
5 package manifest
6
7 import (
8         "errors"
9         "fmt"
10         "git.curoverse.com/arvados.git/sdk/go/blockdigest"
11         "regexp"
12         "strconv"
13         "strings"
14 )
15
16 var ErrInvalidToken = errors.New("Invalid token")
17
18 var LocatorPattern = regexp.MustCompile(
19         "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
20
21 type Manifest struct {
22         Text string
23         Err  error
24 }
25
26 type BlockLocator struct {
27         Digest blockdigest.BlockDigest
28         Size   int
29         Hints  []string
30 }
31
32 type DataSegment struct {
33         BlockLocator
34         Locator      string
35         StreamOffset uint64
36 }
37
38 // FileSegment is a portion of a file that is contained within a
39 // single block.
40 type FileSegment struct {
41         Locator string
42         // Offset (within this block) of this data segment
43         Offset int
44         Len    int
45 }
46
47 // FileStreamSegment is a portion of a file described as a segment of a stream.
48 type FileStreamSegment struct {
49         SegPos uint64
50         SegLen uint64
51         Name   string
52 }
53
54 // Represents a single line from a manifest.
55 type ManifestStream struct {
56         StreamName         string
57         Blocks             []string
58         FileStreamSegments []FileStreamSegment
59         Err                error
60 }
61
62 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
63
64 func unescapeSeq(seq string) string {
65         if seq == `\\` {
66                 return `\`
67         }
68         i, err := strconv.ParseUint(seq[1:], 8, 8)
69         if err != nil {
70                 // Invalid escape sequence: can't unescape.
71                 return seq
72         }
73         return string([]byte{byte(i)})
74 }
75
76 func UnescapeName(s string) string {
77         return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
78 }
79
80 func ParseBlockLocator(s string) (b BlockLocator, err error) {
81         if !LocatorPattern.MatchString(s) {
82                 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
83                         "\"%s\".",
84                         s,
85                         LocatorPattern.String())
86         } else {
87                 tokens := strings.Split(s, "+")
88                 var blockSize int64
89                 var blockDigest blockdigest.BlockDigest
90                 // We expect both of the following to succeed since LocatorPattern
91                 // restricts the strings appropriately.
92                 blockDigest, err = blockdigest.FromString(tokens[0])
93                 if err != nil {
94                         return
95                 }
96                 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
97                 if err != nil {
98                         return
99                 }
100                 b.Digest = blockDigest
101                 b.Size = int(blockSize)
102                 b.Hints = tokens[2:]
103         }
104         return
105 }
106
107 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
108         parts := strings.SplitN(tok, ":", 3)
109         if len(parts) != 3 {
110                 err = ErrInvalidToken
111                 return
112         }
113         ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
114         if err != nil {
115                 return
116         }
117         ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
118         if err != nil {
119                 return
120         }
121         ft.Name = UnescapeName(parts[2])
122         return
123 }
124
125 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
126         ch := make(chan *FileSegment)
127         go func() {
128                 s.sendFileSegmentIterByName(filepath, ch)
129                 close(ch)
130         }()
131         return ch
132 }
133
134 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
135         blockLens := make([]int, 0, len(s.Blocks))
136         // This is what streamName+"/"+fileName will look like:
137         target := "./" + filepath
138         for _, fTok := range s.FileStreamSegments {
139                 wantPos := fTok.SegPos
140                 wantLen := fTok.SegLen
141                 name := fTok.Name
142
143                 if s.StreamName+"/"+name != target {
144                         continue
145                 }
146                 if wantLen == 0 {
147                         ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
148                         continue
149                 }
150                 // Linear search for blocks containing data for this
151                 // file
152                 var blockPos uint64 = 0 // position of block in stream
153                 for i, loc := range s.Blocks {
154                         if blockPos >= wantPos+wantLen {
155                                 break
156                         }
157                         if len(blockLens) <= i {
158                                 blockLens = blockLens[:i+1]
159                                 b, err := ParseBlockLocator(loc)
160                                 if err != nil {
161                                         // Unparseable locator -> unusable
162                                         // stream.
163                                         ch <- nil
164                                         return
165                                 }
166                                 blockLens[i] = b.Size
167                         }
168                         blockLen := uint64(blockLens[i])
169                         if blockPos+blockLen <= wantPos {
170                                 blockPos += blockLen
171                                 continue
172                         }
173                         fseg := FileSegment{
174                                 Locator: loc,
175                                 Offset:  0,
176                                 Len:     blockLens[i],
177                         }
178                         if blockPos < wantPos {
179                                 fseg.Offset = int(wantPos - blockPos)
180                                 fseg.Len -= fseg.Offset
181                         }
182                         if blockPos+blockLen > wantPos+wantLen {
183                                 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
184                         }
185                         ch <- &fseg
186                         blockPos += blockLen
187                 }
188         }
189 }
190
191 func parseManifestStream(s string) (m ManifestStream) {
192         tokens := strings.Split(s, " ")
193
194         m.StreamName = UnescapeName(tokens[0])
195         if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
196                 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
197                 return
198         }
199
200         tokens = tokens[1:]
201         var i int
202         for i = 0; i < len(tokens); i++ {
203                 if !blockdigest.IsBlockLocator(tokens[i]) {
204                         break
205                 }
206         }
207         m.Blocks = tokens[:i]
208         fileTokens := tokens[i:]
209
210         if len(m.Blocks) == 0 {
211                 m.Err = fmt.Errorf("No block locators found")
212                 return
213         }
214
215         if len(fileTokens) == 0 {
216                 m.Err = fmt.Errorf("No file tokens found")
217                 return
218         }
219
220         for _, ft := range fileTokens {
221                 pft, err := parseFileStreamSegment(ft)
222                 if err != nil {
223                         m.Err = fmt.Errorf("Invalid file token: %s", ft)
224                         break
225                 }
226                 m.FileStreamSegments = append(m.FileStreamSegments, pft)
227         }
228
229         return
230 }
231
232 func (m *Manifest) StreamIter() <-chan ManifestStream {
233         ch := make(chan ManifestStream)
234         go func(input string) {
235                 // This slice holds the current line and the remainder of the
236                 // manifest.  We parse one line at a time, to save effort if we
237                 // only need the first few lines.
238                 lines := []string{"", input}
239                 for {
240                         lines = strings.SplitN(lines[1], "\n", 2)
241                         if len(lines[0]) > 0 {
242                                 // Only parse non-blank lines
243                                 ch <- parseManifestStream(lines[0])
244                         }
245                         if len(lines) == 1 {
246                                 break
247                         }
248                 }
249                 close(ch)
250         }(m.Text)
251         return ch
252 }
253
254 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
255         ch := make(chan *FileSegment)
256         go func() {
257                 for stream := range m.StreamIter() {
258                         if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
259                                 continue
260                         }
261                         stream.sendFileSegmentIterByName(filepath, ch)
262                 }
263                 close(ch)
264         }()
265         return ch
266 }
267
268 // Blocks may appear multiple times within the same manifest if they
269 // are used by multiple files. In that case this Iterator will output
270 // the same block multiple times.
271 //
272 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
273 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
274         blockChannel := make(chan blockdigest.BlockLocator)
275         go func(streamChannel <-chan ManifestStream) {
276                 for ms := range streamChannel {
277                         if ms.Err != nil {
278                                 m.Err = ms.Err
279                                 continue
280                         }
281                         for _, block := range ms.Blocks {
282                                 if b, err := blockdigest.ParseBlockLocator(block); err == nil {
283                                         blockChannel <- b
284                                 } else {
285                                         m.Err = err
286                                 }
287                         }
288                 }
289                 close(blockChannel)
290         }(m.StreamIter())
291         return blockChannel
292 }