1 /* Deals with parsing Manifest Text. */
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
10 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
17 var ErrInvalidToken = errors.New("Invalid token")
19 var LocatorPattern = regexp.MustCompile(
20 "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
22 type Manifest struct {
27 type BlockLocator struct {
28 Digest blockdigest.BlockDigest
33 type DataSegment struct {
39 // FileSegment is a portion of a file that is contained within a
41 type FileSegment struct {
43 // Offset (within this block) of this data segment
48 // FileStreamSegment is a portion of a file described as a segment of a stream.
49 type FileStreamSegment struct {
55 // Represents a single line from a manifest.
56 type ManifestStream struct {
59 FileStreamSegments []FileStreamSegment
63 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
65 func unescapeSeq(seq string) string {
69 i, err := strconv.ParseUint(seq[1:], 8, 8)
71 // Invalid escape sequence: can't unescape.
74 return string([]byte{byte(i)})
77 func UnescapeName(s string) string {
78 return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
81 func ParseBlockLocator(s string) (b BlockLocator, err error) {
82 if !LocatorPattern.MatchString(s) {
83 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
86 LocatorPattern.String())
88 tokens := strings.Split(s, "+")
90 var blockDigest blockdigest.BlockDigest
91 // We expect both of the following to succeed since LocatorPattern
92 // restricts the strings appropriately.
93 blockDigest, err = blockdigest.FromString(tokens[0])
97 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
101 b.Digest = blockDigest
102 b.Size = int(blockSize)
108 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
109 parts := strings.SplitN(tok, ":", 3)
111 err = ErrInvalidToken
114 ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
118 ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
122 ft.Name = UnescapeName(parts[2])
126 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
127 ch := make(chan *FileSegment)
129 s.sendFileSegmentIterByName(filepath, ch)
135 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
136 blockLens := make([]int, 0, len(s.Blocks))
137 // This is what streamName+"/"+fileName will look like:
138 target := "./" + filepath
139 for _, fTok := range s.FileStreamSegments {
140 wantPos := fTok.SegPos
141 wantLen := fTok.SegLen
144 if s.StreamName+"/"+name != target {
148 ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
151 // Linear search for blocks containing data for this
153 var blockPos uint64 = 0 // position of block in stream
154 for i, loc := range s.Blocks {
155 if blockPos >= wantPos+wantLen {
158 if len(blockLens) <= i {
159 blockLens = blockLens[:i+1]
160 b, err := ParseBlockLocator(loc)
162 // Unparseable locator -> unusable
167 blockLens[i] = b.Size
169 blockLen := uint64(blockLens[i])
170 if blockPos+blockLen <= wantPos {
179 if blockPos < wantPos {
180 fseg.Offset = int(wantPos - blockPos)
181 fseg.Len -= fseg.Offset
183 if blockPos+blockLen > wantPos+wantLen {
184 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
192 func parseManifestStream(s string) (m ManifestStream) {
193 tokens := strings.Split(s, " ")
195 m.StreamName = UnescapeName(tokens[0])
196 if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
197 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
203 for i = 0; i < len(tokens); i++ {
204 if !blockdigest.IsBlockLocator(tokens[i]) {
208 m.Blocks = tokens[:i]
209 fileTokens := tokens[i:]
211 if len(m.Blocks) == 0 {
212 m.Err = fmt.Errorf("No block locators found")
216 if len(fileTokens) == 0 {
217 m.Err = fmt.Errorf("No file tokens found")
221 for _, ft := range fileTokens {
222 pft, err := parseFileStreamSegment(ft)
224 m.Err = fmt.Errorf("Invalid file token: %s", ft)
227 m.FileStreamSegments = append(m.FileStreamSegments, pft)
233 func (m *Manifest) NormalizeManifest() map[string]ManifestStream {
234 streams := make(map[string]ManifestStream)
236 for stream := range m.StreamIter() {
237 ms := streams[stream.StreamName]
239 if ms.StreamName == "" { // new stream
240 streams[stream.StreamName] = stream
242 ms.Blocks = append(ms.Blocks, stream.Blocks...)
243 ms.FileStreamSegments = append(ms.FileStreamSegments, stream.FileStreamSegments...)
250 func (m *Manifest) NormalizedManifestForPath(path string) string {
251 normalized := m.NormalizeManifest()
254 for _, stream := range normalized {
255 streams = append(streams, stream.StreamName)
257 sort.Strings(streams)
259 path = strings.Trim(path, "/")
260 var subdir, filename string
263 if strings.Index(path, "/") == -1 {
265 for _, v := range streams {
276 pathIdx := strings.LastIndex(path, "/")
278 subdir = path[0:pathIdx]
279 filename = path[pathIdx+1:]
284 manifestForPath := ""
286 for _, streamName := range streams {
287 stream := normalized[streamName]
289 if subdir != "" && stream.StreamName != "./"+subdir {
293 manifestForPath += stream.StreamName + " " + strings.Join(stream.Blocks, " ") + " "
296 currentSpan := []uint64{0, 0}
297 for _, fss := range stream.FileStreamSegments {
298 if filename != "" && fss.Name != filename {
302 if fss.Name != currentName && currentName != "" {
303 manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + " "
306 if fss.Name != currentName {
307 currentName = fss.Name
308 currentSpan = []uint64{0, 0}
311 if currentSpan[1] == 0 {
312 currentSpan = []uint64{fss.SegPos, fss.SegLen}
314 if currentSpan[1] == fss.SegPos {
315 currentSpan[1] += fss.SegLen
316 } else if currentSpan[0]+currentSpan[1] == fss.SegPos {
317 currentSpan[1] = fss.SegPos + fss.SegLen
319 manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]+fss.SegLen) + ":" + fss.Name + " "
320 currentSpan = []uint64{fss.SegPos, fss.SegPos + fss.SegLen}
324 manifestForPath += fmt.Sprintf("%v", currentSpan[0]) + ":" + fmt.Sprintf("%v", currentSpan[1]) + ":" + currentName + "\n"
327 return manifestForPath
330 func (m *Manifest) StreamIter() <-chan ManifestStream {
331 ch := make(chan ManifestStream)
332 go func(input string) {
333 // This slice holds the current line and the remainder of the
334 // manifest. We parse one line at a time, to save effort if we
335 // only need the first few lines.
336 lines := []string{"", input}
338 lines = strings.SplitN(lines[1], "\n", 2)
339 if len(lines[0]) > 0 {
340 // Only parse non-blank lines
341 ch <- parseManifestStream(lines[0])
352 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
353 ch := make(chan *FileSegment)
355 for stream := range m.StreamIter() {
356 if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
359 stream.sendFileSegmentIterByName(filepath, ch)
366 // Blocks may appear multiple times within the same manifest if they
367 // are used by multiple files. In that case this Iterator will output
368 // the same block multiple times.
370 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
371 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
372 blockChannel := make(chan blockdigest.BlockLocator)
373 go func(streamChannel <-chan ManifestStream) {
374 for ms := range streamChannel {
379 for _, block := range ms.Blocks {
380 if b, err := blockdigest.ParseBlockLocator(block); err == nil {