1 /* Deals with parsing Manifest Text. */
3 // Inspired by the Manifest class in arvados/sdk/ruby/lib/arvados/keep.rb
10 "git.curoverse.com/arvados.git/sdk/go/blockdigest"
16 var ErrInvalidToken = errors.New("Invalid token")
18 var LocatorPattern = regexp.MustCompile(
19 "^[0-9a-fA-F]{32}\\+[0-9]+(\\+[A-Z][A-Za-z0-9@_-]+)*$")
21 type Manifest struct {
26 type BlockLocator struct {
27 Digest blockdigest.BlockDigest
32 type DataSegment struct {
38 // FileSegment is a portion of a file that is contained within a
40 type FileSegment struct {
42 // Offset (within this block) of this data segment
47 // FileStreamSegment is a portion of a file described as a segment of a stream.
48 type FileStreamSegment struct {
54 // Represents a single line from a manifest.
55 type ManifestStream struct {
58 FileStreamSegments []FileStreamSegment
62 var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
64 func unescapeSeq(seq string) string {
68 i, err := strconv.ParseUint(seq[1:], 8, 8)
70 // Invalid escape sequence: can't unescape.
73 return string([]byte{byte(i)})
76 func UnescapeName(s string) string {
77 return escapeSeq.ReplaceAllStringFunc(s, unescapeSeq)
80 func ParseBlockLocator(s string) (b BlockLocator, err error) {
81 if !LocatorPattern.MatchString(s) {
82 err = fmt.Errorf("String \"%s\" does not match BlockLocator pattern "+
85 LocatorPattern.String())
87 tokens := strings.Split(s, "+")
89 var blockDigest blockdigest.BlockDigest
90 // We expect both of the following to succeed since LocatorPattern
91 // restricts the strings appropriately.
92 blockDigest, err = blockdigest.FromString(tokens[0])
96 blockSize, err = strconv.ParseInt(tokens[1], 10, 0)
100 b.Digest = blockDigest
101 b.Size = int(blockSize)
107 func parseFileStreamSegment(tok string) (ft FileStreamSegment, err error) {
108 parts := strings.SplitN(tok, ":", 3)
110 err = ErrInvalidToken
113 ft.SegPos, err = strconv.ParseUint(parts[0], 10, 64)
117 ft.SegLen, err = strconv.ParseUint(parts[1], 10, 64)
121 ft.Name = UnescapeName(parts[2])
125 func (s *ManifestStream) FileSegmentIterByName(filepath string) <-chan *FileSegment {
126 ch := make(chan *FileSegment)
128 s.sendFileSegmentIterByName(filepath, ch)
134 func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *FileSegment) {
135 blockLens := make([]int, 0, len(s.Blocks))
136 // This is what streamName+"/"+fileName will look like:
137 target := "./" + filepath
138 for _, fTok := range s.FileStreamSegments {
139 wantPos := fTok.SegPos
140 wantLen := fTok.SegLen
143 if s.StreamName+"/"+name != target {
147 ch <- &FileSegment{Locator: "d41d8cd98f00b204e9800998ecf8427e+0", Offset: 0, Len: 0}
150 // Linear search for blocks containing data for this
152 var blockPos uint64 = 0 // position of block in stream
153 for i, loc := range s.Blocks {
154 if blockPos >= wantPos+wantLen {
157 if len(blockLens) <= i {
158 blockLens = blockLens[:i+1]
159 b, err := ParseBlockLocator(loc)
161 // Unparseable locator -> unusable
166 blockLens[i] = b.Size
168 blockLen := uint64(blockLens[i])
169 if blockPos+blockLen <= wantPos {
178 if blockPos < wantPos {
179 fseg.Offset = int(wantPos - blockPos)
180 fseg.Len -= fseg.Offset
182 if blockPos+blockLen > wantPos+wantLen {
183 fseg.Len = int(wantPos+wantLen-blockPos) - fseg.Offset
191 func parseManifestStream(s string) (m ManifestStream) {
192 tokens := strings.Split(s, " ")
194 m.StreamName = UnescapeName(tokens[0])
195 if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
196 m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
202 for i = 0; i < len(tokens); i++ {
203 if !blockdigest.IsBlockLocator(tokens[i]) {
207 m.Blocks = tokens[:i]
208 fileTokens := tokens[i:]
210 if len(m.Blocks) == 0 {
211 m.Err = fmt.Errorf("No block locators found")
215 if len(fileTokens) == 0 {
216 m.Err = fmt.Errorf("No file tokens found")
220 for _, ft := range fileTokens {
221 pft, err := parseFileStreamSegment(ft)
223 m.Err = fmt.Errorf("Invalid file token: %s", ft)
226 m.FileStreamSegments = append(m.FileStreamSegments, pft)
232 func (m *Manifest) StreamIter() <-chan ManifestStream {
233 ch := make(chan ManifestStream)
234 go func(input string) {
235 // This slice holds the current line and the remainder of the
236 // manifest. We parse one line at a time, to save effort if we
237 // only need the first few lines.
238 lines := []string{"", input}
240 lines = strings.SplitN(lines[1], "\n", 2)
241 if len(lines[0]) > 0 {
242 // Only parse non-blank lines
243 ch <- parseManifestStream(lines[0])
254 func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
255 ch := make(chan *FileSegment)
257 for stream := range m.StreamIter() {
258 if !strings.HasPrefix("./"+filepath, stream.StreamName+"/") {
261 stream.sendFileSegmentIterByName(filepath, ch)
268 // Blocks may appear mulitple times within the same manifest if they
269 // are used by multiple files. In that case this Iterator will output
270 // the same block multiple times.
272 // In order to detect parse errors, caller must check m.Err after the returned channel closes.
273 func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
274 blockChannel := make(chan blockdigest.BlockLocator)
275 go func(streamChannel <-chan ManifestStream) {
276 for ms := range streamChannel {
281 for _, block := range ms.Blocks {
282 if b, err := blockdigest.ParseBlockLocator(block); err == nil {