12483: Track memory use. Flush filled blocks while writing.
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrDirectoryNotEmpty = errors.New("directory not empty")
28         ErrPermission        = os.ErrPermission
29
30         maxBlockSize = 1 << 26
31 )
32
33 type File interface {
34         io.Reader
35         io.Writer
36         io.Closer
37         io.Seeker
38         Size() int64
39         Readdir(int) ([]os.FileInfo, error)
40         Stat() (os.FileInfo, error)
41         Truncate(int64) error
42 }
43
44 type keepClient interface {
45         ReadAt(locator string, p []byte, off int) (int, error)
46         PutB(p []byte) (string, int, error)
47 }
48
49 type fileinfo struct {
50         name    string
51         mode    os.FileMode
52         size    int64
53         modTime time.Time
54 }
55
56 // Name implements os.FileInfo.
57 func (fi fileinfo) Name() string {
58         return fi.name
59 }
60
61 // ModTime implements os.FileInfo.
62 func (fi fileinfo) ModTime() time.Time {
63         return fi.modTime
64 }
65
66 // Mode implements os.FileInfo.
67 func (fi fileinfo) Mode() os.FileMode {
68         return fi.mode
69 }
70
71 // IsDir implements os.FileInfo.
72 func (fi fileinfo) IsDir() bool {
73         return fi.mode&os.ModeDir != 0
74 }
75
76 // Size implements os.FileInfo.
77 func (fi fileinfo) Size() int64 {
78         return fi.size
79 }
80
81 // Sys implements os.FileInfo.
82 func (fi fileinfo) Sys() interface{} {
83         return nil
84 }
85
86 // A CollectionFileSystem is an http.Filesystem plus Stat() and
87 // support for opening writable files.
88 type CollectionFileSystem interface {
89         http.FileSystem
90         Stat(name string) (os.FileInfo, error)
91         Create(name string) (File, error)
92         OpenFile(name string, flag int, perm os.FileMode) (File, error)
93         Mkdir(name string, perm os.FileMode) error
94         Remove(name string) error
95         MarshalManifest(string) (string, error)
96 }
97
98 type fileSystem struct {
99         dirnode
100 }
101
102 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
103         return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
104 }
105
106 func (fs *fileSystem) Open(name string) (http.File, error) {
107         return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
108 }
109
110 func (fs *fileSystem) Create(name string) (File, error) {
111         return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
112 }
113
114 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
115         f, err := fs.OpenFile(name, os.O_RDONLY, 0)
116         if err != nil {
117                 return nil, err
118         }
119         defer f.Close()
120         return f.Stat()
121 }
122
123 type inode interface {
124         Parent() inode
125         Read([]byte, filenodePtr) (int, filenodePtr, error)
126         Write([]byte, filenodePtr) (int, filenodePtr, error)
127         Truncate(int64) error
128         Readdir() []os.FileInfo
129         Size() int64
130         Stat() os.FileInfo
131         sync.Locker
132         RLock()
133         RUnlock()
134 }
135
136 // filenode implements inode.
137 type filenode struct {
138         fileinfo fileinfo
139         parent   *dirnode
140         extents  []extent
141         repacked int64 // number of times anything in []extents has changed len
142         memsize  int64 // bytes in memExtents
143         sync.RWMutex
144 }
145
146 // filenodePtr is an offset into a file that is (usually) efficient to
147 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
148 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
149 // corresponds to file offset filenodePtr.off. Otherwise, it is
150 // necessary to reexamine len(filenode.extents[0]) etc. to find the
151 // correct extent and offset.
152 type filenodePtr struct {
153         off       int64
154         extentIdx int
155         extentOff int
156         repacked  int64
157 }
158
159 // seek returns a ptr that is consistent with both startPtr.off and
160 // the current state of fn. The caller must already hold fn.RLock() or
161 // fn.Lock().
162 //
163 // If startPtr points beyond the end of the file, ptr will point to
164 // exactly the end of the file.
165 //
166 // After seeking:
167 //
168 //     ptr.extentIdx == len(filenode.extents) // i.e., at EOF
169 //     ||
170 //     filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
171 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
172         ptr = startPtr
173         if ptr.off < 0 {
174                 // meaningless anyway
175                 return
176         } else if ptr.off >= fn.fileinfo.size {
177                 ptr.off = fn.fileinfo.size
178                 ptr.extentIdx = len(fn.extents)
179                 ptr.extentOff = 0
180                 ptr.repacked = fn.repacked
181                 return
182         } else if ptr.repacked == fn.repacked {
183                 // extentIdx and extentOff accurately reflect ptr.off,
184                 // but might have fallen off the end of an extent
185                 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
186                         ptr.extentIdx++
187                         ptr.extentOff = 0
188                 }
189                 return
190         }
191         defer func() {
192                 ptr.repacked = fn.repacked
193         }()
194         if ptr.off >= fn.fileinfo.size {
195                 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
196                 return
197         }
198         // Recompute extentIdx and extentOff.  We have already
199         // established fn.fileinfo.size > ptr.off >= 0, so we don't
200         // have to deal with edge cases here.
201         var off int64
202         for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
203                 // This would panic (index out of range) if
204                 // fn.fileinfo.size were larger than
205                 // sum(fn.extents[i].Len()) -- but that can't happen
206                 // because we have ensured fn.fileinfo.size is always
207                 // accurate.
208                 extLen := int64(fn.extents[ptr.extentIdx].Len())
209                 if off+extLen > ptr.off {
210                         ptr.extentOff = int(ptr.off - off)
211                         break
212                 }
213                 off += extLen
214         }
215         return
216 }
217
218 func (fn *filenode) appendExtent(e extent) {
219         fn.Lock()
220         defer fn.Unlock()
221         fn.extents = append(fn.extents, e)
222         fn.fileinfo.size += int64(e.Len())
223 }
224
225 func (fn *filenode) Parent() inode {
226         return fn.parent
227 }
228
229 func (fn *filenode) Readdir() []os.FileInfo {
230         return nil
231 }
232
233 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
234         fn.RLock()
235         defer fn.RUnlock()
236         ptr = fn.seek(startPtr)
237         if ptr.off < 0 {
238                 err = ErrNegativeOffset
239                 return
240         }
241         if ptr.extentIdx >= len(fn.extents) {
242                 err = io.EOF
243                 return
244         }
245         n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
246         if n > 0 {
247                 ptr.off += int64(n)
248                 ptr.extentOff += n
249                 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
250                         ptr.extentIdx++
251                         ptr.extentOff = 0
252                         if ptr.extentIdx < len(fn.extents) && err == io.EOF {
253                                 err = nil
254                         }
255                 }
256         }
257         return
258 }
259
260 func (fn *filenode) Size() int64 {
261         fn.RLock()
262         defer fn.RUnlock()
263         return fn.fileinfo.Size()
264 }
265
266 func (fn *filenode) Stat() os.FileInfo {
267         fn.RLock()
268         defer fn.RUnlock()
269         return fn.fileinfo
270 }
271
272 func (fn *filenode) Truncate(size int64) error {
273         fn.Lock()
274         defer fn.Unlock()
275         if size < fn.fileinfo.size {
276                 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
277                 for i := ptr.extentIdx; i < len(fn.extents); i++ {
278                         if ext, ok := fn.extents[i].(*memExtent); ok {
279                                 fn.memsize -= int64(ext.Len())
280                         }
281                 }
282                 if ptr.extentOff == 0 {
283                         fn.extents = fn.extents[:ptr.extentIdx]
284                 } else {
285                         fn.extents = fn.extents[:ptr.extentIdx+1]
286                         switch ext := fn.extents[ptr.extentIdx].(type) {
287                         case *memExtent:
288                                 ext.Truncate(ptr.extentOff)
289                                 fn.memsize += int64(ext.Len())
290                         default:
291                                 fn.extents[ptr.extentIdx] = ext.Slice(0, ptr.extentOff)
292                         }
293                 }
294                 fn.fileinfo.size = size
295                 fn.repacked++
296                 return nil
297         }
298         for size > fn.fileinfo.size {
299                 grow := size - fn.fileinfo.size
300                 var e writableExtent
301                 var ok bool
302                 if len(fn.extents) == 0 {
303                         e = &memExtent{}
304                         fn.extents = append(fn.extents, e)
305                 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
306                         e = &memExtent{}
307                         fn.extents = append(fn.extents, e)
308                 } else {
309                         fn.repacked++
310                 }
311                 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
312                         grow = maxgrow
313                 }
314                 e.Truncate(e.Len() + int(grow))
315                 fn.fileinfo.size += grow
316                 fn.memsize += grow
317         }
318         return nil
319 }
320
321 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
322         fn.Lock()
323         defer fn.Unlock()
324         ptr = fn.seek(startPtr)
325         if ptr.off < 0 {
326                 err = ErrNegativeOffset
327                 return
328         }
329         for len(p) > 0 && err == nil {
330                 cando := p
331                 if len(cando) > maxBlockSize {
332                         cando = cando[:maxBlockSize]
333                 }
334                 // Rearrange/grow fn.extents (and shrink cando if
335                 // needed) such that cando can be copied to
336                 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
337                 cur := ptr.extentIdx
338                 prev := ptr.extentIdx - 1
339                 var curWritable bool
340                 if cur < len(fn.extents) {
341                         _, curWritable = fn.extents[cur].(writableExtent)
342                 }
343                 var prevAppendable bool
344                 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
345                         _, prevAppendable = fn.extents[prev].(writableExtent)
346                 }
347                 if ptr.extentOff > 0 && !curWritable {
348                         // Split a non-writable block.
349                         if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
350                                 // Truncate cur, and insert a new
351                                 // extent after it.
352                                 cando = cando[:max]
353                                 fn.extents = append(fn.extents, nil)
354                                 copy(fn.extents[cur+1:], fn.extents[cur:])
355                         } else {
356                                 // Split cur into two copies, truncate
357                                 // the one on the left, shift the one
358                                 // on the right, and insert a new
359                                 // extent between them.
360                                 fn.extents = append(fn.extents, nil, nil)
361                                 copy(fn.extents[cur+2:], fn.extents[cur:])
362                                 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
363                         }
364                         cur++
365                         prev++
366                         e := &memExtent{}
367                         e.Truncate(len(cando))
368                         fn.memsize += int64(len(cando))
369                         fn.extents[cur] = e
370                         fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
371                         ptr.extentIdx++
372                         ptr.extentOff = 0
373                         fn.repacked++
374                         ptr.repacked++
375                 } else if curWritable {
376                         if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
377                                 cando = cando[:fit]
378                         }
379                 } else {
380                         if prevAppendable {
381                                 // Shrink cando if needed to fit in prev extent.
382                                 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
383                                         cando = cando[:cangrow]
384                                 }
385                         }
386
387                         if cur == len(fn.extents) {
388                                 // ptr is at EOF, filesize is changing.
389                                 fn.fileinfo.size += int64(len(cando))
390                         } else if el := fn.extents[cur].Len(); el <= len(cando) {
391                                 // cando is long enough that we won't
392                                 // need cur any more. shrink cando to
393                                 // be exactly as long as cur
394                                 // (otherwise we'd accidentally shift
395                                 // the effective position of all
396                                 // extents after cur).
397                                 cando = cando[:el]
398                                 copy(fn.extents[cur:], fn.extents[cur+1:])
399                                 fn.extents = fn.extents[:len(fn.extents)-1]
400                         } else {
401                                 // shrink cur by the same #bytes we're growing prev
402                                 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
403                         }
404
405                         if prevAppendable {
406                                 // Grow prev.
407                                 ptr.extentIdx--
408                                 ptr.extentOff = fn.extents[prev].Len()
409                                 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
410                                 fn.memsize += int64(len(cando))
411                                 ptr.repacked++
412                                 fn.repacked++
413                         } else {
414                                 // Insert an extent between prev and cur, and advance prev/cur.
415                                 fn.extents = append(fn.extents, nil)
416                                 if cur < len(fn.extents) {
417                                         copy(fn.extents[cur+1:], fn.extents[cur:])
418                                         ptr.repacked++
419                                         fn.repacked++
420                                 } else {
421                                         // appending a new extent does
422                                         // not invalidate any ptrs
423                                 }
424                                 e := &memExtent{}
425                                 e.Truncate(len(cando))
426                                 fn.memsize += int64(len(cando))
427                                 fn.extents[cur] = e
428                                 cur++
429                                 prev++
430                         }
431                 }
432
433                 // Finally we can copy bytes from cando to the current extent.
434                 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
435                 n += len(cando)
436                 p = p[len(cando):]
437
438                 ptr.off += int64(len(cando))
439                 ptr.extentOff += len(cando)
440                 if ptr.extentOff >= maxBlockSize {
441                         fn.pruneMemExtents()
442                 }
443                 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
444                         ptr.extentOff = 0
445                         ptr.extentIdx++
446                 }
447         }
448         return
449 }
450
451 // Write some data out to disk to reduce memory use. Caller must have
452 // write lock.
453 func (fn *filenode) pruneMemExtents() {
454         // TODO: async (don't hold Lock() while waiting for Keep)
455         // TODO: share code with (*dirnode)sync()
456         // TODO: pack/flush small blocks too, when fragmented
457         for idx, ext := range fn.extents {
458                 ext, ok := ext.(*memExtent)
459                 if !ok || ext.Len() < maxBlockSize {
460                         continue
461                 }
462                 locator, _, err := fn.parent.kc.PutB(ext.buf)
463                 if err != nil {
464                         // TODO: stall (or return errors from)
465                         // subsequent writes until flushing
466                         // starts to succeed
467                         continue
468                 }
469                 fn.memsize -= int64(ext.Len())
470                 fn.extents[idx] = storedExtent{
471                         kc:      fn.parent.kc,
472                         locator: locator,
473                         size:    ext.Len(),
474                         offset:  0,
475                         length:  ext.Len(),
476                 }
477         }
478 }
479
480 // FileSystem returns a CollectionFileSystem for the collection.
481 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
482         fs := &fileSystem{dirnode: dirnode{
483                 client:   client,
484                 kc:       kc,
485                 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
486                 parent:   nil,
487                 inodes:   make(map[string]inode),
488         }}
489         fs.dirnode.parent = &fs.dirnode
490         if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
491                 return nil, err
492         }
493         return fs, nil
494 }
495
496 type file struct {
497         inode
498         ptr        filenodePtr
499         append     bool
500         writable   bool
501         unreaddirs []os.FileInfo
502 }
503
504 func (f *file) Read(p []byte) (n int, err error) {
505         n, f.ptr, err = f.inode.Read(p, f.ptr)
506         return
507 }
508
509 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
510         size := f.inode.Size()
511         ptr := f.ptr
512         switch whence {
513         case os.SEEK_SET:
514                 ptr.off = off
515         case os.SEEK_CUR:
516                 ptr.off += off
517         case os.SEEK_END:
518                 ptr.off = size + off
519         }
520         if ptr.off < 0 {
521                 return f.ptr.off, ErrNegativeOffset
522         }
523         if ptr.off > size {
524                 ptr.off = size
525         }
526         if ptr.off != f.ptr.off {
527                 f.ptr = ptr
528                 // force filenode to recompute f.ptr fields on next
529                 // use
530                 f.ptr.repacked = -1
531         }
532         return f.ptr.off, nil
533 }
534
535 func (f *file) Truncate(size int64) error {
536         return f.inode.Truncate(size)
537 }
538
539 func (f *file) Write(p []byte) (n int, err error) {
540         if !f.writable {
541                 return 0, ErrReadOnlyFile
542         }
543         n, f.ptr, err = f.inode.Write(p, f.ptr)
544         return
545 }
546
547 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
548         if !f.inode.Stat().IsDir() {
549                 return nil, ErrInvalidOperation
550         }
551         if count <= 0 {
552                 return f.inode.Readdir(), nil
553         }
554         if f.unreaddirs == nil {
555                 f.unreaddirs = f.inode.Readdir()
556         }
557         if len(f.unreaddirs) == 0 {
558                 return nil, io.EOF
559         }
560         if count > len(f.unreaddirs) {
561                 count = len(f.unreaddirs)
562         }
563         ret := f.unreaddirs[:count]
564         f.unreaddirs = f.unreaddirs[count:]
565         return ret, nil
566 }
567
568 func (f *file) Stat() (os.FileInfo, error) {
569         return f.inode.Stat(), nil
570 }
571
572 func (f *file) Close() error {
573         // FIXME: flush
574         return nil
575 }
576
577 type dirnode struct {
578         fileinfo fileinfo
579         parent   *dirnode
580         client   *Client
581         kc       keepClient
582         inodes   map[string]inode
583         sync.RWMutex
584 }
585
586 // sync flushes in-memory data (for all files in the tree rooted at
587 // dn) to persistent storage. Caller must hold dn.Lock().
588 func (dn *dirnode) sync() error {
589         type shortBlock struct {
590                 fn  *filenode
591                 idx int
592         }
593         var pending []shortBlock
594         var pendingLen int
595
596         flush := func(sbs []shortBlock) error {
597                 if len(sbs) == 0 {
598                         return nil
599                 }
600                 block := make([]byte, 0, maxBlockSize)
601                 for _, sb := range sbs {
602                         block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
603                 }
604                 locator, _, err := dn.kc.PutB(block)
605                 if err != nil {
606                         return err
607                 }
608                 off := 0
609                 for _, sb := range sbs {
610                         data := sb.fn.extents[sb.idx].(*memExtent).buf
611                         sb.fn.extents[sb.idx] = storedExtent{
612                                 kc:      dn.kc,
613                                 locator: locator,
614                                 size:    len(block),
615                                 offset:  off,
616                                 length:  len(data),
617                         }
618                         off += len(data)
619                         sb.fn.memsize -= int64(len(data))
620                 }
621                 return nil
622         }
623
624         names := make([]string, 0, len(dn.inodes))
625         for name := range dn.inodes {
626                 names = append(names, name)
627         }
628         sort.Strings(names)
629
630         for _, name := range names {
631                 fn, ok := dn.inodes[name].(*filenode)
632                 if !ok {
633                         continue
634                 }
635                 fn.Lock()
636                 defer fn.Unlock()
637                 for idx, ext := range fn.extents {
638                         ext, ok := ext.(*memExtent)
639                         if !ok {
640                                 continue
641                         }
642                         if ext.Len() > maxBlockSize/2 {
643                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
644                                         return err
645                                 }
646                                 continue
647                         }
648                         if pendingLen+ext.Len() > maxBlockSize {
649                                 if err := flush(pending); err != nil {
650                                         return err
651                                 }
652                                 pending = nil
653                                 pendingLen = 0
654                         }
655                         pending = append(pending, shortBlock{fn, idx})
656                         pendingLen += ext.Len()
657                 }
658         }
659         return flush(pending)
660 }
661
662 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
663         dn.Lock()
664         defer dn.Unlock()
665         return dn.marshalManifest(prefix)
666 }
667
668 // caller must have read lock.
669 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
670         var streamLen int64
671         type m1segment struct {
672                 name   string
673                 offset int64
674                 length int64
675         }
676         var segments []m1segment
677         var subdirs string
678         var blocks []string
679
680         if err := dn.sync(); err != nil {
681                 return "", err
682         }
683
684         names := make([]string, 0, len(dn.inodes))
685         for name, node := range dn.inodes {
686                 names = append(names, name)
687                 node.Lock()
688                 defer node.Unlock()
689         }
690         sort.Strings(names)
691
692         for _, name := range names {
693                 node := dn.inodes[name]
694                 switch node := node.(type) {
695                 case *dirnode:
696                         subdir, err := node.marshalManifest(prefix + "/" + name)
697                         if err != nil {
698                                 return "", err
699                         }
700                         subdirs = subdirs + subdir
701                 case *filenode:
702                         for _, e := range node.extents {
703                                 switch e := e.(type) {
704                                 case storedExtent:
705                                         if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
706                                                 streamLen -= int64(e.size)
707                                         } else {
708                                                 blocks = append(blocks, e.locator)
709                                         }
710                                         segments = append(segments, m1segment{
711                                                 name:   name,
712                                                 offset: streamLen + int64(e.offset),
713                                                 length: int64(e.length),
714                                         })
715                                         streamLen += int64(e.size)
716                                 default:
717                                         // This can't happen: we
718                                         // haven't unlocked since
719                                         // calling sync().
720                                         panic(fmt.Sprintf("can't marshal extent type %T", e))
721                                 }
722                         }
723                 default:
724                         panic(fmt.Sprintf("can't marshal inode type %T", node))
725                 }
726         }
727         var filetokens []string
728         for _, s := range segments {
729                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
730         }
731         if len(filetokens) == 0 {
732                 return subdirs, nil
733         } else if len(blocks) == 0 {
734                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
735         }
736         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
737 }
738
739 func (dn *dirnode) loadManifest(txt string) error {
740         // FIXME: faster
741         var dirname string
742         streams := strings.Split(txt, "\n")
743         if streams[len(streams)-1] != "" {
744                 return fmt.Errorf("line %d: no trailing newline", len(streams))
745         }
746         for i, stream := range streams[:len(streams)-1] {
747                 lineno := i + 1
748                 var extents []storedExtent
749                 var anyFileTokens bool
750                 for i, token := range strings.Split(stream, " ") {
751                         if i == 0 {
752                                 dirname = manifestUnescape(token)
753                                 continue
754                         }
755                         if !strings.Contains(token, ":") {
756                                 if anyFileTokens {
757                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
758                                 }
759                                 toks := strings.SplitN(token, "+", 3)
760                                 if len(toks) < 2 {
761                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
762                                 }
763                                 length, err := strconv.ParseInt(toks[1], 10, 32)
764                                 if err != nil || length < 0 {
765                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
766                                 }
767                                 extents = append(extents, storedExtent{
768                                         locator: token,
769                                         size:    int(length),
770                                         offset:  0,
771                                         length:  int(length),
772                                 })
773                                 continue
774                         } else if len(extents) == 0 {
775                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
776                         }
777
778                         toks := strings.Split(token, ":")
779                         if len(toks) != 3 {
780                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
781                         }
782                         anyFileTokens = true
783
784                         offset, err := strconv.ParseInt(toks[0], 10, 64)
785                         if err != nil || offset < 0 {
786                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
787                         }
788                         length, err := strconv.ParseInt(toks[1], 10, 64)
789                         if err != nil || length < 0 {
790                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
791                         }
792                         name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
793                         dn.makeParentDirs(name)
794                         f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
795                         if err != nil {
796                                 return fmt.Errorf("line %d: cannot append to %q: %s", lineno, name, err)
797                         }
798                         if f.inode.Stat().IsDir() {
799                                 f.Close()
800                                 return fmt.Errorf("line %d: cannot append to %q: is a directory", lineno, name)
801                         }
802                         // Map the stream offset/range coordinates to
803                         // block/offset/range coordinates and add
804                         // corresponding storedExtents to the filenode
805                         var pos int64
806                         for _, e := range extents {
807                                 next := pos + int64(e.Len())
808                                 if next < offset {
809                                         pos = next
810                                         continue
811                                 }
812                                 if pos > offset+length {
813                                         break
814                                 }
815                                 var blkOff int
816                                 if pos < offset {
817                                         blkOff = int(offset - pos)
818                                 }
819                                 blkLen := e.Len() - blkOff
820                                 if pos+int64(blkOff+blkLen) > offset+length {
821                                         blkLen = int(offset + length - pos - int64(blkOff))
822                                 }
823                                 f.inode.(*filenode).appendExtent(storedExtent{
824                                         kc:      dn.kc,
825                                         locator: e.locator,
826                                         size:    e.size,
827                                         offset:  blkOff,
828                                         length:  blkLen,
829                                 })
830                                 pos = next
831                         }
832                         f.Close()
833                         if pos < offset+length {
834                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
835                         }
836                 }
837                 if !anyFileTokens {
838                         return fmt.Errorf("line %d: no file segments", lineno)
839                 } else if len(extents) == 0 {
840                         return fmt.Errorf("line %d: no locators", lineno)
841                 } else if dirname == "" {
842                         return fmt.Errorf("line %d: no stream name", lineno)
843                 }
844         }
845         return nil
846 }
847
848 func (dn *dirnode) makeParentDirs(name string) (err error) {
849         names := strings.Split(name, "/")
850         for _, name := range names[:len(names)-1] {
851                 f, err := dn.mkdir(name)
852                 if err != nil {
853                         return err
854                 }
855                 defer f.Close()
856                 var ok bool
857                 dn, ok = f.inode.(*dirnode)
858                 if !ok {
859                         return ErrFileExists
860                 }
861         }
862         return nil
863 }
864
865 func (dn *dirnode) mkdir(name string) (*file, error) {
866         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
867 }
868
869 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
870         f, err := dn.mkdir(name)
871         if err != nil {
872                 f.Close()
873         }
874         return err
875 }
876
877 func (dn *dirnode) Remove(name string) error {
878         dirname, name := path.Split(name)
879         if name == "" || name == "." || name == ".." {
880                 return ErrInvalidOperation
881         }
882         dn, ok := dn.lookupPath(dirname).(*dirnode)
883         if !ok {
884                 return os.ErrNotExist
885         }
886         dn.Lock()
887         defer dn.Unlock()
888         switch node := dn.inodes[name].(type) {
889         case nil:
890                 return os.ErrNotExist
891         case *dirnode:
892                 node.RLock()
893                 defer node.RUnlock()
894                 if len(node.inodes) > 0 {
895                         return ErrDirectoryNotEmpty
896                 }
897         }
898         delete(dn.inodes, name)
899         return nil
900 }
901
902 func (dn *dirnode) Parent() inode {
903         dn.RLock()
904         defer dn.RUnlock()
905         return dn.parent
906 }
907
908 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
909         dn.RLock()
910         defer dn.RUnlock()
911         fi = make([]os.FileInfo, 0, len(dn.inodes))
912         for _, inode := range dn.inodes {
913                 fi = append(fi, inode.Stat())
914         }
915         return
916 }
917
918 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
919         return 0, ptr, ErrInvalidOperation
920 }
921
922 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
923         return 0, ptr, ErrInvalidOperation
924 }
925
926 func (dn *dirnode) Size() int64 {
927         dn.RLock()
928         defer dn.RUnlock()
929         return dn.fileinfo.Size()
930 }
931
932 func (dn *dirnode) Stat() os.FileInfo {
933         dn.RLock()
934         defer dn.RUnlock()
935         return dn.fileinfo
936 }
937
938 func (dn *dirnode) Truncate(int64) error {
939         return ErrInvalidOperation
940 }
941
942 // lookupPath returns the inode for the file/directory with the given
943 // name (which may contain "/" separators), along with its parent
944 // node. If no such file/directory exists, the returned node is nil.
945 func (dn *dirnode) lookupPath(path string) (node inode) {
946         node = dn
947         for _, name := range strings.Split(path, "/") {
948                 dn, ok := node.(*dirnode)
949                 if !ok {
950                         return nil
951                 }
952                 if name == "." || name == "" {
953                         continue
954                 }
955                 if name == ".." {
956                         node = node.Parent()
957                         continue
958                 }
959                 dn.RLock()
960                 node = dn.inodes[name]
961                 dn.RUnlock()
962         }
963         return
964 }
965
966 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
967         dirname, name := path.Split(name)
968         dn, ok := dn.lookupPath(dirname).(*dirnode)
969         if !ok {
970                 return nil, os.ErrNotExist
971         }
972         writeMode := flag&(os.O_RDWR|os.O_WRONLY|os.O_CREATE) != 0
973         if !writeMode {
974                 // A directory can be opened via "foo/", "foo/.", or
975                 // "foo/..".
976                 switch name {
977                 case ".", "":
978                         return &file{inode: dn}, nil
979                 case "..":
980                         return &file{inode: dn.Parent()}, nil
981                 }
982         }
983         createMode := flag&os.O_CREATE != 0
984         if createMode {
985                 dn.Lock()
986                 defer dn.Unlock()
987         } else {
988                 dn.RLock()
989                 defer dn.RUnlock()
990         }
991         n, ok := dn.inodes[name]
992         if !ok {
993                 if !createMode {
994                         return nil, os.ErrNotExist
995                 }
996                 if perm.IsDir() {
997                         n = &dirnode{
998                                 parent: dn,
999                                 client: dn.client,
1000                                 kc:     dn.kc,
1001                                 fileinfo: fileinfo{
1002                                         name: name,
1003                                         mode: os.ModeDir | 0755,
1004                                 },
1005                         }
1006                 } else {
1007                         n = &filenode{
1008                                 parent: dn,
1009                                 fileinfo: fileinfo{
1010                                         name: name,
1011                                         mode: 0755,
1012                                 },
1013                         }
1014                 }
1015                 if dn.inodes == nil {
1016                         dn.inodes = make(map[string]inode)
1017                 }
1018                 dn.inodes[name] = n
1019                 dn.fileinfo.size++
1020         } else if flag&os.O_EXCL != 0 {
1021                 return nil, ErrFileExists
1022         }
1023         return &file{
1024                 inode:    n,
1025                 append:   flag&os.O_APPEND != 0,
1026                 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
1027         }, nil
1028 }
1029
1030 type extent interface {
1031         io.ReaderAt
1032         Len() int
1033         // Return a new extent with a subsection of the data from this
1034         // one. length<0 means length=Len()-off.
1035         Slice(off int, length int) extent
1036 }
1037
1038 type writableExtent interface {
1039         extent
1040         WriteAt(p []byte, off int)
1041         Truncate(n int)
1042 }
1043
1044 type memExtent struct {
1045         buf []byte
1046 }
1047
1048 func (me *memExtent) Len() int {
1049         return len(me.buf)
1050 }
1051
1052 func (me *memExtent) Slice(off, length int) extent {
1053         if length < 0 {
1054                 length = len(me.buf) - off
1055         }
1056         buf := make([]byte, length)
1057         copy(buf, me.buf[off:])
1058         return &memExtent{buf: buf}
1059 }
1060
1061 func (me *memExtent) Truncate(n int) {
1062         if n > cap(me.buf) {
1063                 newsize := 1024
1064                 for newsize < n {
1065                         newsize = newsize << 2
1066                 }
1067                 newbuf := make([]byte, n, newsize)
1068                 copy(newbuf, me.buf)
1069                 me.buf = newbuf
1070         } else {
1071                 // Zero unused part when shrinking, in case we grow
1072                 // and start using it again later.
1073                 for i := n; i < len(me.buf); i++ {
1074                         me.buf[i] = 0
1075                 }
1076         }
1077         me.buf = me.buf[:n]
1078 }
1079
1080 func (me *memExtent) WriteAt(p []byte, off int) {
1081         if off+len(p) > len(me.buf) {
1082                 panic("overflowed extent")
1083         }
1084         copy(me.buf[off:], p)
1085 }
1086
1087 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1088         if off > int64(me.Len()) {
1089                 err = io.EOF
1090                 return
1091         }
1092         n = copy(p, me.buf[int(off):])
1093         if n < len(p) {
1094                 err = io.EOF
1095         }
1096         return
1097 }
1098
1099 type storedExtent struct {
1100         kc      keepClient
1101         locator string
1102         size    int
1103         offset  int
1104         length  int
1105 }
1106
1107 func (se storedExtent) Len() int {
1108         return se.length
1109 }
1110
1111 func (se storedExtent) Slice(n, size int) extent {
1112         se.offset += n
1113         se.length -= n
1114         if size >= 0 && se.length > size {
1115                 se.length = size
1116         }
1117         return se
1118 }
1119
1120 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1121         if off > int64(se.length) {
1122                 return 0, io.EOF
1123         }
1124         maxlen := se.length - int(off)
1125         if len(p) > maxlen {
1126                 p = p[:maxlen]
1127                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1128                 if err == nil {
1129                         err = io.EOF
1130                 }
1131                 return
1132         }
1133         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1134 }
1135
1136 func canonicalName(name string) string {
1137         name = path.Clean("/" + name)
1138         if name == "/" || name == "./" {
1139                 name = "."
1140         } else if strings.HasPrefix(name, "/") {
1141                 name = "." + name
1142         }
1143         return name
1144 }
1145
1146 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1147
1148 func manifestUnescapeFunc(seq string) string {
1149         if seq == `\\` {
1150                 return `\`
1151         }
1152         i, err := strconv.ParseUint(seq[1:], 8, 8)
1153         if err != nil {
1154                 // Invalid escape sequence: can't unescape.
1155                 return seq
1156         }
1157         return string([]byte{byte(i)})
1158 }
1159
1160 func manifestUnescape(s string) string {
1161         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1162 }
1163
1164 var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
1165
1166 func manifestEscapeFunc(seq string) string {
1167         return fmt.Sprintf("\\%03o", byte(seq[0]))
1168 }
1169
1170 func manifestEscape(s string) string {
1171         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1172 }