12483: Support O_TRUNC flag.
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrDirectoryNotEmpty = errors.New("directory not empty")
28         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
29         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
30         ErrPermission        = os.ErrPermission
31
32         maxBlockSize = 1 << 26
33 )
34
35 type File interface {
36         io.Reader
37         io.Writer
38         io.Closer
39         io.Seeker
40         Size() int64
41         Readdir(int) ([]os.FileInfo, error)
42         Stat() (os.FileInfo, error)
43         Truncate(int64) error
44 }
45
46 type keepClient interface {
47         ReadAt(locator string, p []byte, off int) (int, error)
48         PutB(p []byte) (string, int, error)
49 }
50
51 type fileinfo struct {
52         name    string
53         mode    os.FileMode
54         size    int64
55         modTime time.Time
56 }
57
58 // Name implements os.FileInfo.
59 func (fi fileinfo) Name() string {
60         return fi.name
61 }
62
63 // ModTime implements os.FileInfo.
64 func (fi fileinfo) ModTime() time.Time {
65         return fi.modTime
66 }
67
68 // Mode implements os.FileInfo.
69 func (fi fileinfo) Mode() os.FileMode {
70         return fi.mode
71 }
72
73 // IsDir implements os.FileInfo.
74 func (fi fileinfo) IsDir() bool {
75         return fi.mode&os.ModeDir != 0
76 }
77
78 // Size implements os.FileInfo.
79 func (fi fileinfo) Size() int64 {
80         return fi.size
81 }
82
83 // Sys implements os.FileInfo.
84 func (fi fileinfo) Sys() interface{} {
85         return nil
86 }
87
88 // A CollectionFileSystem is an http.Filesystem plus Stat() and
89 // support for opening writable files.
90 type CollectionFileSystem interface {
91         http.FileSystem
92         Stat(name string) (os.FileInfo, error)
93         Create(name string) (File, error)
94         OpenFile(name string, flag int, perm os.FileMode) (File, error)
95         Mkdir(name string, perm os.FileMode) error
96         Remove(name string) error
97         MarshalManifest(string) (string, error)
98 }
99
100 type fileSystem struct {
101         dirnode
102 }
103
104 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
105         return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
106 }
107
108 func (fs *fileSystem) Open(name string) (http.File, error) {
109         return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
110 }
111
112 func (fs *fileSystem) Create(name string) (File, error) {
113         return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
114 }
115
116 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
117         f, err := fs.OpenFile(name, os.O_RDONLY, 0)
118         if err != nil {
119                 return nil, err
120         }
121         defer f.Close()
122         return f.Stat()
123 }
124
125 type inode interface {
126         Parent() inode
127         Read([]byte, filenodePtr) (int, filenodePtr, error)
128         Write([]byte, filenodePtr) (int, filenodePtr, error)
129         Truncate(int64) error
130         Readdir() []os.FileInfo
131         Size() int64
132         Stat() os.FileInfo
133         sync.Locker
134         RLock()
135         RUnlock()
136 }
137
138 // filenode implements inode.
139 type filenode struct {
140         fileinfo fileinfo
141         parent   *dirnode
142         extents  []extent
143         repacked int64 // number of times anything in []extents has changed len
144         memsize  int64 // bytes in memExtents
145         sync.RWMutex
146 }
147
148 // filenodePtr is an offset into a file that is (usually) efficient to
149 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
150 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
151 // corresponds to file offset filenodePtr.off. Otherwise, it is
152 // necessary to reexamine len(filenode.extents[0]) etc. to find the
153 // correct extent and offset.
154 type filenodePtr struct {
155         off       int64
156         extentIdx int
157         extentOff int
158         repacked  int64
159 }
160
161 // seek returns a ptr that is consistent with both startPtr.off and
162 // the current state of fn. The caller must already hold fn.RLock() or
163 // fn.Lock().
164 //
165 // If startPtr points beyond the end of the file, ptr will point to
166 // exactly the end of the file.
167 //
168 // After seeking:
169 //
170 //     ptr.extentIdx == len(filenode.extents) // i.e., at EOF
171 //     ||
172 //     filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
173 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
174         ptr = startPtr
175         if ptr.off < 0 {
176                 // meaningless anyway
177                 return
178         } else if ptr.off >= fn.fileinfo.size {
179                 ptr.off = fn.fileinfo.size
180                 ptr.extentIdx = len(fn.extents)
181                 ptr.extentOff = 0
182                 ptr.repacked = fn.repacked
183                 return
184         } else if ptr.repacked == fn.repacked {
185                 // extentIdx and extentOff accurately reflect ptr.off,
186                 // but might have fallen off the end of an extent
187                 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
188                         ptr.extentIdx++
189                         ptr.extentOff = 0
190                 }
191                 return
192         }
193         defer func() {
194                 ptr.repacked = fn.repacked
195         }()
196         if ptr.off >= fn.fileinfo.size {
197                 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
198                 return
199         }
200         // Recompute extentIdx and extentOff.  We have already
201         // established fn.fileinfo.size > ptr.off >= 0, so we don't
202         // have to deal with edge cases here.
203         var off int64
204         for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
205                 // This would panic (index out of range) if
206                 // fn.fileinfo.size were larger than
207                 // sum(fn.extents[i].Len()) -- but that can't happen
208                 // because we have ensured fn.fileinfo.size is always
209                 // accurate.
210                 extLen := int64(fn.extents[ptr.extentIdx].Len())
211                 if off+extLen > ptr.off {
212                         ptr.extentOff = int(ptr.off - off)
213                         break
214                 }
215                 off += extLen
216         }
217         return
218 }
219
220 func (fn *filenode) appendExtent(e extent) {
221         fn.Lock()
222         defer fn.Unlock()
223         fn.extents = append(fn.extents, e)
224         fn.fileinfo.size += int64(e.Len())
225 }
226
227 func (fn *filenode) Parent() inode {
228         return fn.parent
229 }
230
231 func (fn *filenode) Readdir() []os.FileInfo {
232         return nil
233 }
234
235 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
236         ptr = fn.seek(startPtr)
237         if ptr.off < 0 {
238                 err = ErrNegativeOffset
239                 return
240         }
241         if ptr.extentIdx >= len(fn.extents) {
242                 err = io.EOF
243                 return
244         }
245         n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
246         if n > 0 {
247                 ptr.off += int64(n)
248                 ptr.extentOff += n
249                 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
250                         ptr.extentIdx++
251                         ptr.extentOff = 0
252                         if ptr.extentIdx < len(fn.extents) && err == io.EOF {
253                                 err = nil
254                         }
255                 }
256         }
257         return
258 }
259
260 func (fn *filenode) Size() int64 {
261         fn.RLock()
262         defer fn.RUnlock()
263         return fn.fileinfo.Size()
264 }
265
266 func (fn *filenode) Stat() os.FileInfo {
267         fn.RLock()
268         defer fn.RUnlock()
269         return fn.fileinfo
270 }
271
272 func (fn *filenode) Truncate(size int64) error {
273         fn.Lock()
274         defer fn.Unlock()
275         if size < fn.fileinfo.size {
276                 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
277                 for i := ptr.extentIdx; i < len(fn.extents); i++ {
278                         if ext, ok := fn.extents[i].(*memExtent); ok {
279                                 fn.memsize -= int64(ext.Len())
280                         }
281                 }
282                 if ptr.extentOff == 0 {
283                         fn.extents = fn.extents[:ptr.extentIdx]
284                 } else {
285                         fn.extents = fn.extents[:ptr.extentIdx+1]
286                         switch ext := fn.extents[ptr.extentIdx].(type) {
287                         case *memExtent:
288                                 ext.Truncate(ptr.extentOff)
289                                 fn.memsize += int64(ext.Len())
290                         default:
291                                 fn.extents[ptr.extentIdx] = ext.Slice(0, ptr.extentOff)
292                         }
293                 }
294                 fn.fileinfo.size = size
295                 fn.repacked++
296                 return nil
297         }
298         for size > fn.fileinfo.size {
299                 grow := size - fn.fileinfo.size
300                 var e writableExtent
301                 var ok bool
302                 if len(fn.extents) == 0 {
303                         e = &memExtent{}
304                         fn.extents = append(fn.extents, e)
305                 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
306                         e = &memExtent{}
307                         fn.extents = append(fn.extents, e)
308                 } else {
309                         fn.repacked++
310                 }
311                 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
312                         grow = maxgrow
313                 }
314                 e.Truncate(e.Len() + int(grow))
315                 fn.fileinfo.size += grow
316                 fn.memsize += grow
317         }
318         return nil
319 }
320
321 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
322         ptr = fn.seek(startPtr)
323         if ptr.off < 0 {
324                 err = ErrNegativeOffset
325                 return
326         }
327         for len(p) > 0 && err == nil {
328                 cando := p
329                 if len(cando) > maxBlockSize {
330                         cando = cando[:maxBlockSize]
331                 }
332                 // Rearrange/grow fn.extents (and shrink cando if
333                 // needed) such that cando can be copied to
334                 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
335                 cur := ptr.extentIdx
336                 prev := ptr.extentIdx - 1
337                 var curWritable bool
338                 if cur < len(fn.extents) {
339                         _, curWritable = fn.extents[cur].(writableExtent)
340                 }
341                 var prevAppendable bool
342                 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
343                         _, prevAppendable = fn.extents[prev].(writableExtent)
344                 }
345                 if ptr.extentOff > 0 && !curWritable {
346                         // Split a non-writable block.
347                         if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
348                                 // Truncate cur, and insert a new
349                                 // extent after it.
350                                 cando = cando[:max]
351                                 fn.extents = append(fn.extents, nil)
352                                 copy(fn.extents[cur+1:], fn.extents[cur:])
353                         } else {
354                                 // Split cur into two copies, truncate
355                                 // the one on the left, shift the one
356                                 // on the right, and insert a new
357                                 // extent between them.
358                                 fn.extents = append(fn.extents, nil, nil)
359                                 copy(fn.extents[cur+2:], fn.extents[cur:])
360                                 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
361                         }
362                         cur++
363                         prev++
364                         e := &memExtent{}
365                         e.Truncate(len(cando))
366                         fn.memsize += int64(len(cando))
367                         fn.extents[cur] = e
368                         fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
369                         ptr.extentIdx++
370                         ptr.extentOff = 0
371                         fn.repacked++
372                         ptr.repacked++
373                 } else if curWritable {
374                         if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
375                                 cando = cando[:fit]
376                         }
377                 } else {
378                         if prevAppendable {
379                                 // Shrink cando if needed to fit in prev extent.
380                                 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
381                                         cando = cando[:cangrow]
382                                 }
383                         }
384
385                         if cur == len(fn.extents) {
386                                 // ptr is at EOF, filesize is changing.
387                                 fn.fileinfo.size += int64(len(cando))
388                         } else if el := fn.extents[cur].Len(); el <= len(cando) {
389                                 // cando is long enough that we won't
390                                 // need cur any more. shrink cando to
391                                 // be exactly as long as cur
392                                 // (otherwise we'd accidentally shift
393                                 // the effective position of all
394                                 // extents after cur).
395                                 cando = cando[:el]
396                                 copy(fn.extents[cur:], fn.extents[cur+1:])
397                                 fn.extents = fn.extents[:len(fn.extents)-1]
398                         } else {
399                                 // shrink cur by the same #bytes we're growing prev
400                                 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
401                         }
402
403                         if prevAppendable {
404                                 // Grow prev.
405                                 ptr.extentIdx--
406                                 ptr.extentOff = fn.extents[prev].Len()
407                                 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
408                                 fn.memsize += int64(len(cando))
409                                 ptr.repacked++
410                                 fn.repacked++
411                         } else {
412                                 // Insert an extent between prev and cur, and advance prev/cur.
413                                 fn.extents = append(fn.extents, nil)
414                                 if cur < len(fn.extents) {
415                                         copy(fn.extents[cur+1:], fn.extents[cur:])
416                                         ptr.repacked++
417                                         fn.repacked++
418                                 } else {
419                                         // appending a new extent does
420                                         // not invalidate any ptrs
421                                 }
422                                 e := &memExtent{}
423                                 e.Truncate(len(cando))
424                                 fn.memsize += int64(len(cando))
425                                 fn.extents[cur] = e
426                                 cur++
427                                 prev++
428                         }
429                 }
430
431                 // Finally we can copy bytes from cando to the current extent.
432                 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
433                 n += len(cando)
434                 p = p[len(cando):]
435
436                 ptr.off += int64(len(cando))
437                 ptr.extentOff += len(cando)
438                 if ptr.extentOff >= maxBlockSize {
439                         fn.pruneMemExtents()
440                 }
441                 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
442                         ptr.extentOff = 0
443                         ptr.extentIdx++
444                 }
445         }
446         return
447 }
448
449 // Write some data out to disk to reduce memory use. Caller must have
450 // write lock.
451 func (fn *filenode) pruneMemExtents() {
452         // TODO: async (don't hold Lock() while waiting for Keep)
453         // TODO: share code with (*dirnode)sync()
454         // TODO: pack/flush small blocks too, when fragmented
455         for idx, ext := range fn.extents {
456                 ext, ok := ext.(*memExtent)
457                 if !ok || ext.Len() < maxBlockSize {
458                         continue
459                 }
460                 locator, _, err := fn.parent.kc.PutB(ext.buf)
461                 if err != nil {
462                         // TODO: stall (or return errors from)
463                         // subsequent writes until flushing
464                         // starts to succeed
465                         continue
466                 }
467                 fn.memsize -= int64(ext.Len())
468                 fn.extents[idx] = storedExtent{
469                         kc:      fn.parent.kc,
470                         locator: locator,
471                         size:    ext.Len(),
472                         offset:  0,
473                         length:  ext.Len(),
474                 }
475         }
476 }
477
478 // FileSystem returns a CollectionFileSystem for the collection.
479 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
480         fs := &fileSystem{dirnode: dirnode{
481                 client:   client,
482                 kc:       kc,
483                 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
484                 parent:   nil,
485                 inodes:   make(map[string]inode),
486         }}
487         fs.dirnode.parent = &fs.dirnode
488         if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
489                 return nil, err
490         }
491         return fs, nil
492 }
493
494 type file struct {
495         inode
496         ptr        filenodePtr
497         append     bool
498         readable   bool
499         writable   bool
500         unreaddirs []os.FileInfo
501 }
502
503 func (f *file) Read(p []byte) (n int, err error) {
504         if !f.readable {
505                 return 0, ErrWriteOnlyMode
506         }
507         f.inode.RLock()
508         defer f.inode.RUnlock()
509         n, f.ptr, err = f.inode.Read(p, f.ptr)
510         return
511 }
512
513 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
514         size := f.inode.Size()
515         ptr := f.ptr
516         switch whence {
517         case os.SEEK_SET:
518                 ptr.off = off
519         case os.SEEK_CUR:
520                 ptr.off += off
521         case os.SEEK_END:
522                 ptr.off = size + off
523         }
524         if ptr.off < 0 {
525                 return f.ptr.off, ErrNegativeOffset
526         }
527         if ptr.off > size {
528                 ptr.off = size
529         }
530         if ptr.off != f.ptr.off {
531                 f.ptr = ptr
532                 // force filenode to recompute f.ptr fields on next
533                 // use
534                 f.ptr.repacked = -1
535         }
536         return f.ptr.off, nil
537 }
538
539 func (f *file) Truncate(size int64) error {
540         return f.inode.Truncate(size)
541 }
542
543 func (f *file) Write(p []byte) (n int, err error) {
544         if !f.writable {
545                 return 0, ErrReadOnlyFile
546         }
547         f.inode.Lock()
548         defer f.inode.Unlock()
549         if fn, ok := f.inode.(*filenode); ok && f.append {
550                 f.ptr = filenodePtr{
551                         off:       fn.fileinfo.size,
552                         extentIdx: len(fn.extents),
553                         extentOff: 0,
554                         repacked:  fn.repacked,
555                 }
556         }
557         n, f.ptr, err = f.inode.Write(p, f.ptr)
558         return
559 }
560
561 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
562         if !f.inode.Stat().IsDir() {
563                 return nil, ErrInvalidOperation
564         }
565         if count <= 0 {
566                 return f.inode.Readdir(), nil
567         }
568         if f.unreaddirs == nil {
569                 f.unreaddirs = f.inode.Readdir()
570         }
571         if len(f.unreaddirs) == 0 {
572                 return nil, io.EOF
573         }
574         if count > len(f.unreaddirs) {
575                 count = len(f.unreaddirs)
576         }
577         ret := f.unreaddirs[:count]
578         f.unreaddirs = f.unreaddirs[count:]
579         return ret, nil
580 }
581
582 func (f *file) Stat() (os.FileInfo, error) {
583         return f.inode.Stat(), nil
584 }
585
586 func (f *file) Close() error {
587         // FIXME: flush
588         return nil
589 }
590
591 type dirnode struct {
592         fileinfo fileinfo
593         parent   *dirnode
594         client   *Client
595         kc       keepClient
596         inodes   map[string]inode
597         sync.RWMutex
598 }
599
600 // sync flushes in-memory data (for all files in the tree rooted at
601 // dn) to persistent storage. Caller must hold dn.Lock().
602 func (dn *dirnode) sync() error {
603         type shortBlock struct {
604                 fn  *filenode
605                 idx int
606         }
607         var pending []shortBlock
608         var pendingLen int
609
610         flush := func(sbs []shortBlock) error {
611                 if len(sbs) == 0 {
612                         return nil
613                 }
614                 block := make([]byte, 0, maxBlockSize)
615                 for _, sb := range sbs {
616                         block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
617                 }
618                 locator, _, err := dn.kc.PutB(block)
619                 if err != nil {
620                         return err
621                 }
622                 off := 0
623                 for _, sb := range sbs {
624                         data := sb.fn.extents[sb.idx].(*memExtent).buf
625                         sb.fn.extents[sb.idx] = storedExtent{
626                                 kc:      dn.kc,
627                                 locator: locator,
628                                 size:    len(block),
629                                 offset:  off,
630                                 length:  len(data),
631                         }
632                         off += len(data)
633                         sb.fn.memsize -= int64(len(data))
634                 }
635                 return nil
636         }
637
638         names := make([]string, 0, len(dn.inodes))
639         for name := range dn.inodes {
640                 names = append(names, name)
641         }
642         sort.Strings(names)
643
644         for _, name := range names {
645                 fn, ok := dn.inodes[name].(*filenode)
646                 if !ok {
647                         continue
648                 }
649                 fn.Lock()
650                 defer fn.Unlock()
651                 for idx, ext := range fn.extents {
652                         ext, ok := ext.(*memExtent)
653                         if !ok {
654                                 continue
655                         }
656                         if ext.Len() > maxBlockSize/2 {
657                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
658                                         return err
659                                 }
660                                 continue
661                         }
662                         if pendingLen+ext.Len() > maxBlockSize {
663                                 if err := flush(pending); err != nil {
664                                         return err
665                                 }
666                                 pending = nil
667                                 pendingLen = 0
668                         }
669                         pending = append(pending, shortBlock{fn, idx})
670                         pendingLen += ext.Len()
671                 }
672         }
673         return flush(pending)
674 }
675
676 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
677         dn.Lock()
678         defer dn.Unlock()
679         return dn.marshalManifest(prefix)
680 }
681
682 // caller must have read lock.
683 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
684         var streamLen int64
685         type m1segment struct {
686                 name   string
687                 offset int64
688                 length int64
689         }
690         var segments []m1segment
691         var subdirs string
692         var blocks []string
693
694         if err := dn.sync(); err != nil {
695                 return "", err
696         }
697
698         names := make([]string, 0, len(dn.inodes))
699         for name, node := range dn.inodes {
700                 names = append(names, name)
701                 node.Lock()
702                 defer node.Unlock()
703         }
704         sort.Strings(names)
705
706         for _, name := range names {
707                 node := dn.inodes[name]
708                 switch node := node.(type) {
709                 case *dirnode:
710                         subdir, err := node.marshalManifest(prefix + "/" + name)
711                         if err != nil {
712                                 return "", err
713                         }
714                         subdirs = subdirs + subdir
715                 case *filenode:
716                         if len(node.extents) == 0 {
717                                 segments = append(segments, m1segment{name: name})
718                                 break
719                         }
720                         for _, e := range node.extents {
721                                 switch e := e.(type) {
722                                 case storedExtent:
723                                         if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
724                                                 streamLen -= int64(e.size)
725                                         } else {
726                                                 blocks = append(blocks, e.locator)
727                                         }
728                                         next := m1segment{
729                                                 name:   name,
730                                                 offset: streamLen + int64(e.offset),
731                                                 length: int64(e.length),
732                                         }
733                                         if prev := len(segments) - 1; prev >= 0 &&
734                                                 segments[prev].name == name &&
735                                                 segments[prev].offset+segments[prev].length == next.offset {
736                                                 segments[prev].length += next.length
737                                         } else {
738                                                 segments = append(segments, next)
739                                         }
740                                         streamLen += int64(e.size)
741                                 default:
742                                         // This can't happen: we
743                                         // haven't unlocked since
744                                         // calling sync().
745                                         panic(fmt.Sprintf("can't marshal extent type %T", e))
746                                 }
747                         }
748                 default:
749                         panic(fmt.Sprintf("can't marshal inode type %T", node))
750                 }
751         }
752         var filetokens []string
753         for _, s := range segments {
754                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
755         }
756         if len(filetokens) == 0 {
757                 return subdirs, nil
758         } else if len(blocks) == 0 {
759                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
760         }
761         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
762 }
763
764 func (dn *dirnode) loadManifest(txt string) error {
765         // FIXME: faster
766         var dirname string
767         streams := strings.Split(txt, "\n")
768         if streams[len(streams)-1] != "" {
769                 return fmt.Errorf("line %d: no trailing newline", len(streams))
770         }
771         for i, stream := range streams[:len(streams)-1] {
772                 lineno := i + 1
773                 var extents []storedExtent
774                 var anyFileTokens bool
775                 for i, token := range strings.Split(stream, " ") {
776                         if i == 0 {
777                                 dirname = manifestUnescape(token)
778                                 continue
779                         }
780                         if !strings.Contains(token, ":") {
781                                 if anyFileTokens {
782                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
783                                 }
784                                 toks := strings.SplitN(token, "+", 3)
785                                 if len(toks) < 2 {
786                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
787                                 }
788                                 length, err := strconv.ParseInt(toks[1], 10, 32)
789                                 if err != nil || length < 0 {
790                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
791                                 }
792                                 extents = append(extents, storedExtent{
793                                         locator: token,
794                                         size:    int(length),
795                                         offset:  0,
796                                         length:  int(length),
797                                 })
798                                 continue
799                         } else if len(extents) == 0 {
800                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
801                         }
802
803                         toks := strings.Split(token, ":")
804                         if len(toks) != 3 {
805                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
806                         }
807                         anyFileTokens = true
808
809                         offset, err := strconv.ParseInt(toks[0], 10, 64)
810                         if err != nil || offset < 0 {
811                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
812                         }
813                         length, err := strconv.ParseInt(toks[1], 10, 64)
814                         if err != nil || length < 0 {
815                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
816                         }
817                         name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
818                         err = dn.makeParentDirs(name)
819                         if err != nil {
820                                 return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
821                         }
822                         f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
823                         if err != nil {
824                                 return fmt.Errorf("line %d: cannot append to %q: %s", lineno, name, err)
825                         }
826                         if f.inode.Stat().IsDir() {
827                                 f.Close()
828                                 return fmt.Errorf("line %d: cannot append to %q: is a directory", lineno, name)
829                         }
830                         // Map the stream offset/range coordinates to
831                         // block/offset/range coordinates and add
832                         // corresponding storedExtents to the filenode
833                         var pos int64
834                         for _, e := range extents {
835                                 next := pos + int64(e.Len())
836                                 if next < offset {
837                                         pos = next
838                                         continue
839                                 }
840                                 if pos > offset+length {
841                                         break
842                                 }
843                                 var blkOff int
844                                 if pos < offset {
845                                         blkOff = int(offset - pos)
846                                 }
847                                 blkLen := e.Len() - blkOff
848                                 if pos+int64(blkOff+blkLen) > offset+length {
849                                         blkLen = int(offset + length - pos - int64(blkOff))
850                                 }
851                                 f.inode.(*filenode).appendExtent(storedExtent{
852                                         kc:      dn.kc,
853                                         locator: e.locator,
854                                         size:    e.size,
855                                         offset:  blkOff,
856                                         length:  blkLen,
857                                 })
858                                 pos = next
859                         }
860                         f.Close()
861                         if pos < offset+length {
862                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
863                         }
864                 }
865                 if !anyFileTokens {
866                         return fmt.Errorf("line %d: no file segments", lineno)
867                 } else if len(extents) == 0 {
868                         return fmt.Errorf("line %d: no locators", lineno)
869                 } else if dirname == "" {
870                         return fmt.Errorf("line %d: no stream name", lineno)
871                 }
872         }
873         return nil
874 }
875
876 func (dn *dirnode) makeParentDirs(name string) (err error) {
877         names := strings.Split(name, "/")
878         for _, name := range names[:len(names)-1] {
879                 f, err := dn.OpenFile(name, os.O_CREATE, os.ModeDir|0755)
880                 if err != nil {
881                         return err
882                 }
883                 defer f.Close()
884                 var ok bool
885                 dn, ok = f.inode.(*dirnode)
886                 if !ok {
887                         return ErrFileExists
888                 }
889         }
890         return nil
891 }
892
893 func (dn *dirnode) mkdir(name string) (*file, error) {
894         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
895 }
896
897 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
898         f, err := dn.mkdir(name)
899         if err == nil {
900                 err = f.Close()
901         }
902         return err
903 }
904
905 func (dn *dirnode) Remove(name string) error {
906         dirname, name := path.Split(name)
907         if name == "" || name == "." || name == ".." {
908                 return ErrInvalidOperation
909         }
910         dn, ok := dn.lookupPath(dirname).(*dirnode)
911         if !ok {
912                 return os.ErrNotExist
913         }
914         dn.Lock()
915         defer dn.Unlock()
916         switch node := dn.inodes[name].(type) {
917         case nil:
918                 return os.ErrNotExist
919         case *dirnode:
920                 node.RLock()
921                 defer node.RUnlock()
922                 if len(node.inodes) > 0 {
923                         return ErrDirectoryNotEmpty
924                 }
925         }
926         delete(dn.inodes, name)
927         return nil
928 }
929
930 func (dn *dirnode) Parent() inode {
931         dn.RLock()
932         defer dn.RUnlock()
933         return dn.parent
934 }
935
936 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
937         dn.RLock()
938         defer dn.RUnlock()
939         fi = make([]os.FileInfo, 0, len(dn.inodes))
940         for _, inode := range dn.inodes {
941                 fi = append(fi, inode.Stat())
942         }
943         return
944 }
945
946 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
947         return 0, ptr, ErrInvalidOperation
948 }
949
950 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
951         return 0, ptr, ErrInvalidOperation
952 }
953
954 func (dn *dirnode) Size() int64 {
955         dn.RLock()
956         defer dn.RUnlock()
957         return dn.fileinfo.Size()
958 }
959
960 func (dn *dirnode) Stat() os.FileInfo {
961         dn.RLock()
962         defer dn.RUnlock()
963         return dn.fileinfo
964 }
965
966 func (dn *dirnode) Truncate(int64) error {
967         return ErrInvalidOperation
968 }
969
970 // lookupPath returns the inode for the file/directory with the given
971 // name (which may contain "/" separators), along with its parent
972 // node. If no such file/directory exists, the returned node is nil.
973 func (dn *dirnode) lookupPath(path string) (node inode) {
974         node = dn
975         for _, name := range strings.Split(path, "/") {
976                 dn, ok := node.(*dirnode)
977                 if !ok {
978                         return nil
979                 }
980                 if name == "." || name == "" {
981                         continue
982                 }
983                 if name == ".." {
984                         node = node.Parent()
985                         continue
986                 }
987                 dn.RLock()
988                 node = dn.inodes[name]
989                 dn.RUnlock()
990         }
991         return
992 }
993
994 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
995         if flag&os.O_SYNC != 0 {
996                 return nil, ErrSyncNotSupported
997         }
998         dirname, name := path.Split(name)
999         dn, ok := dn.lookupPath(dirname).(*dirnode)
1000         if !ok {
1001                 return nil, os.ErrNotExist
1002         }
1003         var readable, writable bool
1004         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
1005         case os.O_RDWR:
1006                 readable = true
1007                 writable = true
1008         case os.O_RDONLY:
1009                 readable = true
1010         case os.O_WRONLY:
1011                 writable = true
1012         default:
1013                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
1014         }
1015         if !writable {
1016                 // A directory can be opened via "foo/", "foo/.", or
1017                 // "foo/..".
1018                 switch name {
1019                 case ".", "":
1020                         return &file{inode: dn}, nil
1021                 case "..":
1022                         return &file{inode: dn.Parent()}, nil
1023                 }
1024         }
1025         createMode := flag&os.O_CREATE != 0
1026         if createMode {
1027                 dn.Lock()
1028                 defer dn.Unlock()
1029         } else {
1030                 dn.RLock()
1031                 defer dn.RUnlock()
1032         }
1033         n, ok := dn.inodes[name]
1034         if !ok {
1035                 if !createMode {
1036                         return nil, os.ErrNotExist
1037                 }
1038                 if perm.IsDir() {
1039                         n = &dirnode{
1040                                 parent: dn,
1041                                 client: dn.client,
1042                                 kc:     dn.kc,
1043                                 fileinfo: fileinfo{
1044                                         name: name,
1045                                         mode: os.ModeDir | 0755,
1046                                 },
1047                         }
1048                 } else {
1049                         n = &filenode{
1050                                 parent: dn,
1051                                 fileinfo: fileinfo{
1052                                         name: name,
1053                                         mode: 0755,
1054                                 },
1055                         }
1056                 }
1057                 if dn.inodes == nil {
1058                         dn.inodes = make(map[string]inode)
1059                 }
1060                 dn.inodes[name] = n
1061                 dn.fileinfo.size++
1062         } else if flag&os.O_EXCL != 0 {
1063                 return nil, ErrFileExists
1064         } else if flag&os.O_TRUNC != 0 {
1065                 if !writable {
1066                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
1067                 } else if fn, ok := n.(*filenode); !ok {
1068                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
1069                 } else {
1070                         fn.Truncate(0)
1071                 }
1072         }
1073         return &file{
1074                 inode:    n,
1075                 append:   flag&os.O_APPEND != 0,
1076                 readable: readable,
1077                 writable: writable,
1078         }, nil
1079 }
1080
1081 type extent interface {
1082         io.ReaderAt
1083         Len() int
1084         // Return a new extent with a subsection of the data from this
1085         // one. length<0 means length=Len()-off.
1086         Slice(off int, length int) extent
1087 }
1088
1089 type writableExtent interface {
1090         extent
1091         WriteAt(p []byte, off int)
1092         Truncate(n int)
1093 }
1094
1095 type memExtent struct {
1096         buf []byte
1097 }
1098
1099 func (me *memExtent) Len() int {
1100         return len(me.buf)
1101 }
1102
1103 func (me *memExtent) Slice(off, length int) extent {
1104         if length < 0 {
1105                 length = len(me.buf) - off
1106         }
1107         buf := make([]byte, length)
1108         copy(buf, me.buf[off:])
1109         return &memExtent{buf: buf}
1110 }
1111
1112 func (me *memExtent) Truncate(n int) {
1113         if n > cap(me.buf) {
1114                 newsize := 1024
1115                 for newsize < n {
1116                         newsize = newsize << 2
1117                 }
1118                 newbuf := make([]byte, n, newsize)
1119                 copy(newbuf, me.buf)
1120                 me.buf = newbuf
1121         } else {
1122                 // Zero unused part when shrinking, in case we grow
1123                 // and start using it again later.
1124                 for i := n; i < len(me.buf); i++ {
1125                         me.buf[i] = 0
1126                 }
1127         }
1128         me.buf = me.buf[:n]
1129 }
1130
1131 func (me *memExtent) WriteAt(p []byte, off int) {
1132         if off+len(p) > len(me.buf) {
1133                 panic("overflowed extent")
1134         }
1135         copy(me.buf[off:], p)
1136 }
1137
1138 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1139         if off > int64(me.Len()) {
1140                 err = io.EOF
1141                 return
1142         }
1143         n = copy(p, me.buf[int(off):])
1144         if n < len(p) {
1145                 err = io.EOF
1146         }
1147         return
1148 }
1149
1150 type storedExtent struct {
1151         kc      keepClient
1152         locator string
1153         size    int
1154         offset  int
1155         length  int
1156 }
1157
1158 func (se storedExtent) Len() int {
1159         return se.length
1160 }
1161
1162 func (se storedExtent) Slice(n, size int) extent {
1163         se.offset += n
1164         se.length -= n
1165         if size >= 0 && se.length > size {
1166                 se.length = size
1167         }
1168         return se
1169 }
1170
1171 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1172         if off > int64(se.length) {
1173                 return 0, io.EOF
1174         }
1175         maxlen := se.length - int(off)
1176         if len(p) > maxlen {
1177                 p = p[:maxlen]
1178                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1179                 if err == nil {
1180                         err = io.EOF
1181                 }
1182                 return
1183         }
1184         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1185 }
1186
1187 func canonicalName(name string) string {
1188         name = path.Clean("/" + name)
1189         if name == "/" || name == "./" {
1190                 name = "."
1191         } else if strings.HasPrefix(name, "/") {
1192                 name = "." + name
1193         }
1194         return name
1195 }
1196
1197 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1198
1199 func manifestUnescapeFunc(seq string) string {
1200         if seq == `\\` {
1201                 return `\`
1202         }
1203         i, err := strconv.ParseUint(seq[1:], 8, 8)
1204         if err != nil {
1205                 // Invalid escape sequence: can't unescape.
1206                 return seq
1207         }
1208         return string([]byte{byte(i)})
1209 }
1210
1211 func manifestUnescape(s string) string {
1212         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1213 }
1214
1215 var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
1216
1217 func manifestEscapeFunc(seq string) string {
1218         return fmt.Sprintf("\\%03o", byte(seq[0]))
1219 }
1220
1221 func manifestEscape(s string) string {
1222         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1223 }