13111: Move code from fs_collection to fs_base and fs_filehandle.
[arvados.git] / sdk / go / arvados / fs_collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "encoding/json"
9         "fmt"
10         "io"
11         "os"
12         "path"
13         "regexp"
14         "sort"
15         "strconv"
16         "strings"
17         "sync"
18         "time"
19 )
20
21 var maxBlockSize = 1 << 26
22
23 type keepClient interface {
24         ReadAt(locator string, p []byte, off int) (int, error)
25         PutB(p []byte) (string, int, error)
26 }
27
28 // A CollectionFileSystem is a FileSystem that can be serialized as a
29 // manifest and stored as a collection.
30 type CollectionFileSystem interface {
31         FileSystem
32
33         // Flush all file data to Keep and return a snapshot of the
34         // filesystem suitable for saving as (Collection)ManifestText.
35         // Prefix (normally ".") is a top level directory, effectively
36         // prepended to all paths in the returned manifest.
37         MarshalManifest(prefix string) (string, error)
38 }
39
40 // FileSystem returns a CollectionFileSystem for the collection.
41 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
42         var modTime time.Time
43         if c.ModifiedAt == nil {
44                 modTime = time.Now()
45         } else {
46                 modTime = *c.ModifiedAt
47         }
48         dn := &dirnode{
49                 client: client,
50                 kc:     kc,
51                 treenode: treenode{
52                         fileinfo: fileinfo{
53                                 name:    ".",
54                                 mode:    os.ModeDir | 0755,
55                                 modTime: modTime,
56                         },
57                         parent: nil,
58                         inodes: make(map[string]inode),
59                 },
60         }
61         dn.parent = dn
62         fs := &collectionFileSystem{fileSystem: fileSystem{inode: dn}}
63         if err := dn.loadManifest(c.ManifestText); err != nil {
64                 return nil, err
65         }
66         return fs, nil
67 }
68
69 type collectionFileSystem struct {
70         fileSystem
71 }
72
73 func (fs collectionFileSystem) Child(name string, replace func(inode) inode) inode {
74         if name == ".arvados#collection" {
75                 return &getternode{Getter: func() ([]byte, error) {
76                         var coll Collection
77                         var err error
78                         coll.ManifestText, err = fs.MarshalManifest(".")
79                         if err != nil {
80                                 return nil, err
81                         }
82                         data, err := json.Marshal(&coll)
83                         if err == nil {
84                                 data = append(data, 10)
85                         }
86                         return data, err
87                 }}
88         }
89         return fs.fileSystem.Child(name, replace)
90 }
91
92 func (fs collectionFileSystem) MarshalManifest(prefix string) (string, error) {
93         fs.fileSystem.inode.Lock()
94         defer fs.fileSystem.inode.Unlock()
95         return fs.fileSystem.inode.(*dirnode).marshalManifest(prefix)
96 }
97
98 // filenodePtr is an offset into a file that is (usually) efficient to
99 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
100 // then
101 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
102 // corresponds to file offset filenodePtr.off. Otherwise, it is
103 // necessary to reexamine len(filenode.segments[0]) etc. to find the
104 // correct segment and offset.
105 type filenodePtr struct {
106         off        int64
107         segmentIdx int
108         segmentOff int
109         repacked   int64
110 }
111
112 // seek returns a ptr that is consistent with both startPtr.off and
113 // the current state of fn. The caller must already hold fn.RLock() or
114 // fn.Lock().
115 //
116 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
117 // EOF.
118 //
119 // After seeking:
120 //
121 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
122 //     ||
123 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
124 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
125         ptr = startPtr
126         if ptr.off < 0 {
127                 // meaningless anyway
128                 return
129         } else if ptr.off >= fn.fileinfo.size {
130                 ptr.segmentIdx = len(fn.segments)
131                 ptr.segmentOff = 0
132                 ptr.repacked = fn.repacked
133                 return
134         } else if ptr.repacked == fn.repacked {
135                 // segmentIdx and segmentOff accurately reflect
136                 // ptr.off, but might have fallen off the end of a
137                 // segment
138                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
139                         ptr.segmentIdx++
140                         ptr.segmentOff = 0
141                 }
142                 return
143         }
144         defer func() {
145                 ptr.repacked = fn.repacked
146         }()
147         if ptr.off >= fn.fileinfo.size {
148                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
149                 return
150         }
151         // Recompute segmentIdx and segmentOff.  We have already
152         // established fn.fileinfo.size > ptr.off >= 0, so we don't
153         // have to deal with edge cases here.
154         var off int64
155         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
156                 // This would panic (index out of range) if
157                 // fn.fileinfo.size were larger than
158                 // sum(fn.segments[i].Len()) -- but that can't happen
159                 // because we have ensured fn.fileinfo.size is always
160                 // accurate.
161                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
162                 if off+segLen > ptr.off {
163                         ptr.segmentOff = int(ptr.off - off)
164                         break
165                 }
166                 off += segLen
167         }
168         return
169 }
170
171 // filenode implements inode.
172 type filenode struct {
173         fileinfo fileinfo
174         parent   *dirnode
175         segments []segment
176         // number of times `segments` has changed in a
177         // way that might invalidate a filenodePtr
178         repacked int64
179         memsize  int64 // bytes in memSegments
180         sync.RWMutex
181         nullnode
182 }
183
184 // caller must have lock
185 func (fn *filenode) appendSegment(e segment) {
186         fn.segments = append(fn.segments, e)
187         fn.fileinfo.size += int64(e.Len())
188 }
189
190 func (fn *filenode) Parent() inode {
191         fn.RLock()
192         defer fn.RUnlock()
193         return fn.parent
194 }
195
196 // Read reads file data from a single segment, starting at startPtr,
197 // into p. startPtr is assumed not to be up-to-date. Caller must have
198 // RLock or Lock.
199 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
200         ptr = fn.seek(startPtr)
201         if ptr.off < 0 {
202                 err = ErrNegativeOffset
203                 return
204         }
205         if ptr.segmentIdx >= len(fn.segments) {
206                 err = io.EOF
207                 return
208         }
209         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
210         if n > 0 {
211                 ptr.off += int64(n)
212                 ptr.segmentOff += n
213                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
214                         ptr.segmentIdx++
215                         ptr.segmentOff = 0
216                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
217                                 err = nil
218                         }
219                 }
220         }
221         return
222 }
223
224 func (fn *filenode) Size() int64 {
225         fn.RLock()
226         defer fn.RUnlock()
227         return fn.fileinfo.Size()
228 }
229
230 func (fn *filenode) FileInfo() os.FileInfo {
231         fn.RLock()
232         defer fn.RUnlock()
233         return fn.fileinfo
234 }
235
236 func (fn *filenode) Truncate(size int64) error {
237         fn.Lock()
238         defer fn.Unlock()
239         return fn.truncate(size)
240 }
241
242 func (fn *filenode) truncate(size int64) error {
243         if size == fn.fileinfo.size {
244                 return nil
245         }
246         fn.repacked++
247         if size < fn.fileinfo.size {
248                 ptr := fn.seek(filenodePtr{off: size})
249                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
250                         if seg, ok := fn.segments[i].(*memSegment); ok {
251                                 fn.memsize -= int64(seg.Len())
252                         }
253                 }
254                 if ptr.segmentOff == 0 {
255                         fn.segments = fn.segments[:ptr.segmentIdx]
256                 } else {
257                         fn.segments = fn.segments[:ptr.segmentIdx+1]
258                         switch seg := fn.segments[ptr.segmentIdx].(type) {
259                         case *memSegment:
260                                 seg.Truncate(ptr.segmentOff)
261                                 fn.memsize += int64(seg.Len())
262                         default:
263                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
264                         }
265                 }
266                 fn.fileinfo.size = size
267                 return nil
268         }
269         for size > fn.fileinfo.size {
270                 grow := size - fn.fileinfo.size
271                 var seg *memSegment
272                 var ok bool
273                 if len(fn.segments) == 0 {
274                         seg = &memSegment{}
275                         fn.segments = append(fn.segments, seg)
276                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
277                         seg = &memSegment{}
278                         fn.segments = append(fn.segments, seg)
279                 }
280                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
281                         grow = maxgrow
282                 }
283                 seg.Truncate(seg.Len() + int(grow))
284                 fn.fileinfo.size += grow
285                 fn.memsize += grow
286         }
287         return nil
288 }
289
290 // Write writes data from p to the file, starting at startPtr,
291 // extending the file size if necessary. Caller must have Lock.
292 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
293         if startPtr.off > fn.fileinfo.size {
294                 if err = fn.truncate(startPtr.off); err != nil {
295                         return 0, startPtr, err
296                 }
297         }
298         ptr = fn.seek(startPtr)
299         if ptr.off < 0 {
300                 err = ErrNegativeOffset
301                 return
302         }
303         for len(p) > 0 && err == nil {
304                 cando := p
305                 if len(cando) > maxBlockSize {
306                         cando = cando[:maxBlockSize]
307                 }
308                 // Rearrange/grow fn.segments (and shrink cando if
309                 // needed) such that cando can be copied to
310                 // fn.segments[ptr.segmentIdx] at offset
311                 // ptr.segmentOff.
312                 cur := ptr.segmentIdx
313                 prev := ptr.segmentIdx - 1
314                 var curWritable bool
315                 if cur < len(fn.segments) {
316                         _, curWritable = fn.segments[cur].(*memSegment)
317                 }
318                 var prevAppendable bool
319                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
320                         _, prevAppendable = fn.segments[prev].(*memSegment)
321                 }
322                 if ptr.segmentOff > 0 && !curWritable {
323                         // Split a non-writable block.
324                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
325                                 // Truncate cur, and insert a new
326                                 // segment after it.
327                                 cando = cando[:max]
328                                 fn.segments = append(fn.segments, nil)
329                                 copy(fn.segments[cur+1:], fn.segments[cur:])
330                         } else {
331                                 // Split cur into two copies, truncate
332                                 // the one on the left, shift the one
333                                 // on the right, and insert a new
334                                 // segment between them.
335                                 fn.segments = append(fn.segments, nil, nil)
336                                 copy(fn.segments[cur+2:], fn.segments[cur:])
337                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
338                         }
339                         cur++
340                         prev++
341                         seg := &memSegment{}
342                         seg.Truncate(len(cando))
343                         fn.memsize += int64(len(cando))
344                         fn.segments[cur] = seg
345                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
346                         ptr.segmentIdx++
347                         ptr.segmentOff = 0
348                         fn.repacked++
349                         ptr.repacked++
350                 } else if curWritable {
351                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
352                                 cando = cando[:fit]
353                         }
354                 } else {
355                         if prevAppendable {
356                                 // Shrink cando if needed to fit in
357                                 // prev segment.
358                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
359                                         cando = cando[:cangrow]
360                                 }
361                         }
362
363                         if cur == len(fn.segments) {
364                                 // ptr is at EOF, filesize is changing.
365                                 fn.fileinfo.size += int64(len(cando))
366                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
367                                 // cando is long enough that we won't
368                                 // need cur any more. shrink cando to
369                                 // be exactly as long as cur
370                                 // (otherwise we'd accidentally shift
371                                 // the effective position of all
372                                 // segments after cur).
373                                 cando = cando[:el]
374                                 copy(fn.segments[cur:], fn.segments[cur+1:])
375                                 fn.segments = fn.segments[:len(fn.segments)-1]
376                         } else {
377                                 // shrink cur by the same #bytes we're growing prev
378                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
379                         }
380
381                         if prevAppendable {
382                                 // Grow prev.
383                                 ptr.segmentIdx--
384                                 ptr.segmentOff = fn.segments[prev].Len()
385                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
386                                 fn.memsize += int64(len(cando))
387                                 ptr.repacked++
388                                 fn.repacked++
389                         } else {
390                                 // Insert a segment between prev and
391                                 // cur, and advance prev/cur.
392                                 fn.segments = append(fn.segments, nil)
393                                 if cur < len(fn.segments) {
394                                         copy(fn.segments[cur+1:], fn.segments[cur:])
395                                         ptr.repacked++
396                                         fn.repacked++
397                                 } else {
398                                         // appending a new segment does
399                                         // not invalidate any ptrs
400                                 }
401                                 seg := &memSegment{}
402                                 seg.Truncate(len(cando))
403                                 fn.memsize += int64(len(cando))
404                                 fn.segments[cur] = seg
405                                 cur++
406                                 prev++
407                         }
408                 }
409
410                 // Finally we can copy bytes from cando to the current segment.
411                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
412                 n += len(cando)
413                 p = p[len(cando):]
414
415                 ptr.off += int64(len(cando))
416                 ptr.segmentOff += len(cando)
417                 if ptr.segmentOff >= maxBlockSize {
418                         fn.pruneMemSegments()
419                 }
420                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
421                         ptr.segmentOff = 0
422                         ptr.segmentIdx++
423                 }
424
425                 fn.fileinfo.modTime = time.Now()
426         }
427         return
428 }
429
430 // Write some data out to disk to reduce memory use. Caller must have
431 // write lock.
432 func (fn *filenode) pruneMemSegments() {
433         // TODO: async (don't hold Lock() while waiting for Keep)
434         // TODO: share code with (*dirnode)sync()
435         // TODO: pack/flush small blocks too, when fragmented
436         for idx, seg := range fn.segments {
437                 seg, ok := seg.(*memSegment)
438                 if !ok || seg.Len() < maxBlockSize {
439                         continue
440                 }
441                 locator, _, err := fn.parent.kc.PutB(seg.buf)
442                 if err != nil {
443                         // TODO: stall (or return errors from)
444                         // subsequent writes until flushing
445                         // starts to succeed
446                         continue
447                 }
448                 fn.memsize -= int64(seg.Len())
449                 fn.segments[idx] = storedSegment{
450                         kc:      fn.parent.kc,
451                         locator: locator,
452                         size:    seg.Len(),
453                         offset:  0,
454                         length:  seg.Len(),
455                 }
456         }
457 }
458
459 type dirnode struct {
460         treenode
461         client *Client
462         kc     keepClient
463 }
464
465 // sync flushes in-memory data (for all files in the tree rooted at
466 // dn) to persistent storage. Caller must hold dn.Lock().
467 func (dn *dirnode) sync() error {
468         type shortBlock struct {
469                 fn  *filenode
470                 idx int
471         }
472         var pending []shortBlock
473         var pendingLen int
474
475         flush := func(sbs []shortBlock) error {
476                 if len(sbs) == 0 {
477                         return nil
478                 }
479                 block := make([]byte, 0, maxBlockSize)
480                 for _, sb := range sbs {
481                         block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
482                 }
483                 locator, _, err := dn.kc.PutB(block)
484                 if err != nil {
485                         return err
486                 }
487                 off := 0
488                 for _, sb := range sbs {
489                         data := sb.fn.segments[sb.idx].(*memSegment).buf
490                         sb.fn.segments[sb.idx] = storedSegment{
491                                 kc:      dn.kc,
492                                 locator: locator,
493                                 size:    len(block),
494                                 offset:  off,
495                                 length:  len(data),
496                         }
497                         off += len(data)
498                         sb.fn.memsize -= int64(len(data))
499                 }
500                 return nil
501         }
502
503         names := make([]string, 0, len(dn.inodes))
504         for name := range dn.inodes {
505                 names = append(names, name)
506         }
507         sort.Strings(names)
508
509         for _, name := range names {
510                 fn, ok := dn.inodes[name].(*filenode)
511                 if !ok {
512                         continue
513                 }
514                 fn.Lock()
515                 defer fn.Unlock()
516                 for idx, seg := range fn.segments {
517                         seg, ok := seg.(*memSegment)
518                         if !ok {
519                                 continue
520                         }
521                         if seg.Len() > maxBlockSize/2 {
522                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
523                                         return err
524                                 }
525                                 continue
526                         }
527                         if pendingLen+seg.Len() > maxBlockSize {
528                                 if err := flush(pending); err != nil {
529                                         return err
530                                 }
531                                 pending = nil
532                                 pendingLen = 0
533                         }
534                         pending = append(pending, shortBlock{fn, idx})
535                         pendingLen += seg.Len()
536                 }
537         }
538         return flush(pending)
539 }
540
541 // caller must have read lock.
542 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
543         var streamLen int64
544         type filepart struct {
545                 name   string
546                 offset int64
547                 length int64
548         }
549         var fileparts []filepart
550         var subdirs string
551         var blocks []string
552
553         if err := dn.sync(); err != nil {
554                 return "", err
555         }
556
557         names := make([]string, 0, len(dn.inodes))
558         for name, node := range dn.inodes {
559                 names = append(names, name)
560                 node.Lock()
561                 defer node.Unlock()
562         }
563         sort.Strings(names)
564
565         for _, name := range names {
566                 switch node := dn.inodes[name].(type) {
567                 case *dirnode:
568                         subdir, err := node.marshalManifest(prefix + "/" + name)
569                         if err != nil {
570                                 return "", err
571                         }
572                         subdirs = subdirs + subdir
573                 case *filenode:
574                         if len(node.segments) == 0 {
575                                 fileparts = append(fileparts, filepart{name: name})
576                                 break
577                         }
578                         for _, seg := range node.segments {
579                                 switch seg := seg.(type) {
580                                 case storedSegment:
581                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
582                                                 streamLen -= int64(seg.size)
583                                         } else {
584                                                 blocks = append(blocks, seg.locator)
585                                         }
586                                         next := filepart{
587                                                 name:   name,
588                                                 offset: streamLen + int64(seg.offset),
589                                                 length: int64(seg.length),
590                                         }
591                                         if prev := len(fileparts) - 1; prev >= 0 &&
592                                                 fileparts[prev].name == name &&
593                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
594                                                 fileparts[prev].length += next.length
595                                         } else {
596                                                 fileparts = append(fileparts, next)
597                                         }
598                                         streamLen += int64(seg.size)
599                                 default:
600                                         // This can't happen: we
601                                         // haven't unlocked since
602                                         // calling sync().
603                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
604                                 }
605                         }
606                 default:
607                         panic(fmt.Sprintf("can't marshal inode type %T", node))
608                 }
609         }
610         var filetokens []string
611         for _, s := range fileparts {
612                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
613         }
614         if len(filetokens) == 0 {
615                 return subdirs, nil
616         } else if len(blocks) == 0 {
617                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
618         }
619         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
620 }
621
622 func (dn *dirnode) loadManifest(txt string) error {
623         var dirname string
624         streams := strings.Split(txt, "\n")
625         if streams[len(streams)-1] != "" {
626                 return fmt.Errorf("line %d: no trailing newline", len(streams))
627         }
628         streams = streams[:len(streams)-1]
629         segments := []storedSegment{}
630         for i, stream := range streams {
631                 lineno := i + 1
632                 var anyFileTokens bool
633                 var pos int64
634                 var segIdx int
635                 segments = segments[:0]
636                 for i, token := range strings.Split(stream, " ") {
637                         if i == 0 {
638                                 dirname = manifestUnescape(token)
639                                 continue
640                         }
641                         if !strings.Contains(token, ":") {
642                                 if anyFileTokens {
643                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
644                                 }
645                                 toks := strings.SplitN(token, "+", 3)
646                                 if len(toks) < 2 {
647                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
648                                 }
649                                 length, err := strconv.ParseInt(toks[1], 10, 32)
650                                 if err != nil || length < 0 {
651                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
652                                 }
653                                 segments = append(segments, storedSegment{
654                                         locator: token,
655                                         size:    int(length),
656                                         offset:  0,
657                                         length:  int(length),
658                                 })
659                                 continue
660                         } else if len(segments) == 0 {
661                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
662                         }
663
664                         toks := strings.SplitN(token, ":", 3)
665                         if len(toks) != 3 {
666                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
667                         }
668                         anyFileTokens = true
669
670                         offset, err := strconv.ParseInt(toks[0], 10, 64)
671                         if err != nil || offset < 0 {
672                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
673                         }
674                         length, err := strconv.ParseInt(toks[1], 10, 64)
675                         if err != nil || length < 0 {
676                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
677                         }
678                         name := dirname + "/" + manifestUnescape(toks[2])
679                         fnode, err := dn.createFileAndParents(name)
680                         if err != nil {
681                                 return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
682                         }
683                         // Map the stream offset/range coordinates to
684                         // block/offset/range coordinates and add
685                         // corresponding storedSegments to the filenode
686                         if pos > offset {
687                                 // Can't continue where we left off.
688                                 // TODO: binary search instead of
689                                 // rewinding all the way (but this
690                                 // situation might be rare anyway)
691                                 segIdx, pos = 0, 0
692                         }
693                         for next := int64(0); segIdx < len(segments); segIdx++ {
694                                 seg := segments[segIdx]
695                                 next = pos + int64(seg.Len())
696                                 if next <= offset || seg.Len() == 0 {
697                                         pos = next
698                                         continue
699                                 }
700                                 if pos >= offset+length {
701                                         break
702                                 }
703                                 var blkOff int
704                                 if pos < offset {
705                                         blkOff = int(offset - pos)
706                                 }
707                                 blkLen := seg.Len() - blkOff
708                                 if pos+int64(blkOff+blkLen) > offset+length {
709                                         blkLen = int(offset + length - pos - int64(blkOff))
710                                 }
711                                 fnode.appendSegment(storedSegment{
712                                         kc:      dn.kc,
713                                         locator: seg.locator,
714                                         size:    seg.size,
715                                         offset:  blkOff,
716                                         length:  blkLen,
717                                 })
718                                 if next > offset+length {
719                                         break
720                                 } else {
721                                         pos = next
722                                 }
723                         }
724                         if segIdx == len(segments) && pos < offset+length {
725                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
726                         }
727                 }
728                 if !anyFileTokens {
729                         return fmt.Errorf("line %d: no file segments", lineno)
730                 } else if len(segments) == 0 {
731                         return fmt.Errorf("line %d: no locators", lineno)
732                 } else if dirname == "" {
733                         return fmt.Errorf("line %d: no stream name", lineno)
734                 }
735         }
736         return nil
737 }
738
739 // only safe to call from loadManifest -- no locking
740 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
741         node := dn
742         names := strings.Split(path, "/")
743         basename := names[len(names)-1]
744         if basename == "" || basename == "." || basename == ".." {
745                 err = fmt.Errorf("invalid filename")
746                 return
747         }
748         for _, name := range names[:len(names)-1] {
749                 switch name {
750                 case "", ".":
751                         continue
752                 case "..":
753                         if node == dn {
754                                 // can't be sure parent will be a *dirnode
755                                 return nil, ErrInvalidArgument
756                         }
757                         node = node.Parent().(*dirnode)
758                         continue
759                 }
760                 node.Child(name, func(child inode) inode {
761                         switch child.(type) {
762                         case nil:
763                                 node, err = dn.newDirnode(node, name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
764                                 child = node
765                         case *dirnode:
766                                 node = child.(*dirnode)
767                         case *filenode:
768                                 err = ErrFileExists
769                         default:
770                                 err = ErrInvalidOperation
771                         }
772                         return child
773                 })
774                 if err != nil {
775                         return
776                 }
777         }
778         node.Child(basename, func(child inode) inode {
779                 switch child := child.(type) {
780                 case nil:
781                         fn, err = dn.newFilenode(node, basename, 0755, node.FileInfo().ModTime())
782                         return fn
783                 case *filenode:
784                         fn = child
785                         return child
786                 case *dirnode:
787                         err = ErrIsDirectory
788                         return child
789                 default:
790                         err = ErrInvalidOperation
791                         return child
792                 }
793         })
794         return
795 }
796
797 // rlookup (recursive lookup) returns the inode for the file/directory
798 // with the given name (which may contain "/" separators). If no such
799 // file/directory exists, the returned node is nil.
800 func rlookup(start inode, path string) (node inode) {
801         node = start
802         for _, name := range strings.Split(path, "/") {
803                 if node == nil {
804                         break
805                 }
806                 if node.IsDir() {
807                         if name == "." || name == "" {
808                                 continue
809                         }
810                         if name == ".." {
811                                 node = node.Parent()
812                                 continue
813                         }
814                 }
815                 node = func() inode {
816                         node.RLock()
817                         defer node.RUnlock()
818                         return node.Child(name, nil)
819                 }()
820         }
821         return
822 }
823
824 // Caller must have lock, and must have already ensured
825 // Children(name,nil) is nil.
826 func (dn *dirnode) newDirnode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *dirnode, err error) {
827         if name == "" || name == "." || name == ".." {
828                 return nil, ErrInvalidArgument
829         }
830         return &dirnode{
831                 client: dn.client,
832                 kc:     dn.kc,
833                 treenode: treenode{
834                         parent: parent,
835                         fileinfo: fileinfo{
836                                 name:    name,
837                                 mode:    perm | os.ModeDir,
838                                 modTime: modTime,
839                         },
840                         inodes: make(map[string]inode),
841                 },
842         }, nil
843 }
844
845 func (dn *dirnode) newFilenode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *filenode, err error) {
846         if name == "" || name == "." || name == ".." {
847                 return nil, ErrInvalidArgument
848         }
849         return &filenode{
850                 parent: parent,
851                 fileinfo: fileinfo{
852                         name:    name,
853                         mode:    perm & ^os.ModeDir,
854                         modTime: modTime,
855                 },
856         }, nil
857 }
858
859 type segment interface {
860         io.ReaderAt
861         Len() int
862         // Return a new segment with a subsection of the data from this
863         // one. length<0 means length=Len()-off.
864         Slice(off int, length int) segment
865 }
866
867 type memSegment struct {
868         buf []byte
869 }
870
871 func (me *memSegment) Len() int {
872         return len(me.buf)
873 }
874
875 func (me *memSegment) Slice(off, length int) segment {
876         if length < 0 {
877                 length = len(me.buf) - off
878         }
879         buf := make([]byte, length)
880         copy(buf, me.buf[off:])
881         return &memSegment{buf: buf}
882 }
883
884 func (me *memSegment) Truncate(n int) {
885         if n > cap(me.buf) {
886                 newsize := 1024
887                 for newsize < n {
888                         newsize = newsize << 2
889                 }
890                 newbuf := make([]byte, n, newsize)
891                 copy(newbuf, me.buf)
892                 me.buf = newbuf
893         } else {
894                 // Zero unused part when shrinking, in case we grow
895                 // and start using it again later.
896                 for i := n; i < len(me.buf); i++ {
897                         me.buf[i] = 0
898                 }
899         }
900         me.buf = me.buf[:n]
901 }
902
903 func (me *memSegment) WriteAt(p []byte, off int) {
904         if off+len(p) > len(me.buf) {
905                 panic("overflowed segment")
906         }
907         copy(me.buf[off:], p)
908 }
909
910 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
911         if off > int64(me.Len()) {
912                 err = io.EOF
913                 return
914         }
915         n = copy(p, me.buf[int(off):])
916         if n < len(p) {
917                 err = io.EOF
918         }
919         return
920 }
921
922 type storedSegment struct {
923         kc      keepClient
924         locator string
925         size    int // size of stored block (also encoded in locator)
926         offset  int // position of segment within the stored block
927         length  int // bytes in this segment (offset + length <= size)
928 }
929
930 func (se storedSegment) Len() int {
931         return se.length
932 }
933
934 func (se storedSegment) Slice(n, size int) segment {
935         se.offset += n
936         se.length -= n
937         if size >= 0 && se.length > size {
938                 se.length = size
939         }
940         return se
941 }
942
943 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
944         if off > int64(se.length) {
945                 return 0, io.EOF
946         }
947         maxlen := se.length - int(off)
948         if len(p) > maxlen {
949                 p = p[:maxlen]
950                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
951                 if err == nil {
952                         err = io.EOF
953                 }
954                 return
955         }
956         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
957 }
958
959 func canonicalName(name string) string {
960         name = path.Clean("/" + name)
961         if name == "/" || name == "./" {
962                 name = "."
963         } else if strings.HasPrefix(name, "/") {
964                 name = "." + name
965         }
966         return name
967 }
968
969 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
970
971 func manifestUnescapeFunc(seq string) string {
972         if seq == `\\` {
973                 return `\`
974         }
975         i, err := strconv.ParseUint(seq[1:], 8, 8)
976         if err != nil {
977                 // Invalid escape sequence: can't unescape.
978                 return seq
979         }
980         return string([]byte{byte(i)})
981 }
982
983 func manifestUnescape(s string) string {
984         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
985 }
986
987 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
988
989 func manifestEscapeFunc(seq string) string {
990         return fmt.Sprintf("\\%03o", byte(seq[0]))
991 }
992
993 func manifestEscape(s string) string {
994         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
995 }