45c0731b0805bd36426261d590b4c44798f141f5
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrInvalidArgument   = errors.New("invalid argument")
28         ErrDirectoryNotEmpty = errors.New("directory not empty")
29         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
30         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
31         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
32         ErrPermission        = os.ErrPermission
33
34         maxBlockSize = 1 << 26
35 )
36
37 type File interface {
38         io.Reader
39         io.Writer
40         io.Closer
41         io.Seeker
42         Size() int64
43         Readdir(int) ([]os.FileInfo, error)
44         Stat() (os.FileInfo, error)
45         Truncate(int64) error
46 }
47
48 type keepClient interface {
49         ReadAt(locator string, p []byte, off int) (int, error)
50         PutB(p []byte) (string, int, error)
51 }
52
53 type fileinfo struct {
54         name    string
55         mode    os.FileMode
56         size    int64
57         modTime time.Time
58 }
59
60 // Name implements os.FileInfo.
61 func (fi fileinfo) Name() string {
62         return fi.name
63 }
64
65 // ModTime implements os.FileInfo.
66 func (fi fileinfo) ModTime() time.Time {
67         return fi.modTime
68 }
69
70 // Mode implements os.FileInfo.
71 func (fi fileinfo) Mode() os.FileMode {
72         return fi.mode
73 }
74
75 // IsDir implements os.FileInfo.
76 func (fi fileinfo) IsDir() bool {
77         return fi.mode&os.ModeDir != 0
78 }
79
80 // Size implements os.FileInfo.
81 func (fi fileinfo) Size() int64 {
82         return fi.size
83 }
84
85 // Sys implements os.FileInfo.
86 func (fi fileinfo) Sys() interface{} {
87         return nil
88 }
89
90 // A CollectionFileSystem is an http.Filesystem plus Stat() and
91 // support for opening writable files. All methods are safe to call
92 // from multiple goroutines.
93 type CollectionFileSystem interface {
94         http.FileSystem
95
96         // analogous to os.Stat()
97         Stat(name string) (os.FileInfo, error)
98
99         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
100         Create(name string) (File, error)
101
102         // Like os.OpenFile(): create or open a file or directory.
103         //
104         // If flag&os.O_EXCL==0, it opens an existing file or
105         // directory if one exists. If flag&os.O_CREATE!=0, it creates
106         // a new empty file or directory if one does not already
107         // exist.
108         //
109         // When creating a new item, perm&os.ModeDir determines
110         // whether it is a file or a directory.
111         //
112         // A file can be opened multiple times and used concurrently
113         // from multiple goroutines. However, each File object should
114         // be used by only one goroutine at a time.
115         OpenFile(name string, flag int, perm os.FileMode) (File, error)
116
117         Mkdir(name string, perm os.FileMode) error
118         Remove(name string) error
119         RemoveAll(name string) error
120         Rename(oldname, newname string) error
121         MarshalManifest(prefix string) (string, error)
122 }
123
124 type fileSystem struct {
125         dirnode
126 }
127
128 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
129         return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
130 }
131
132 func (fs *fileSystem) Open(name string) (http.File, error) {
133         return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
134 }
135
136 func (fs *fileSystem) Create(name string) (File, error) {
137         return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
138 }
139
140 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
141         f, err := fs.OpenFile(name, os.O_RDONLY, 0)
142         if err != nil {
143                 return nil, err
144         }
145         defer f.Close()
146         return f.Stat()
147 }
148
149 type inode interface {
150         Parent() inode
151         Read([]byte, filenodePtr) (int, filenodePtr, error)
152         Write([]byte, filenodePtr) (int, filenodePtr, error)
153         Truncate(int64) error
154         Readdir() []os.FileInfo
155         Size() int64
156         Stat() os.FileInfo
157         sync.Locker
158         RLock()
159         RUnlock()
160 }
161
162 // filenode implements inode.
163 type filenode struct {
164         fileinfo fileinfo
165         parent   *dirnode
166         extents  []extent
167         repacked int64 // number of times anything in []extents has changed len
168         memsize  int64 // bytes in memExtents
169         sync.RWMutex
170 }
171
172 // filenodePtr is an offset into a file that is (usually) efficient to
173 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
174 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
175 // corresponds to file offset filenodePtr.off. Otherwise, it is
176 // necessary to reexamine len(filenode.extents[0]) etc. to find the
177 // correct extent and offset.
178 type filenodePtr struct {
179         off       int64
180         extentIdx int
181         extentOff int
182         repacked  int64
183 }
184
185 // seek returns a ptr that is consistent with both startPtr.off and
186 // the current state of fn. The caller must already hold fn.RLock() or
187 // fn.Lock().
188 //
189 // If startPtr points beyond the end of the file, ptr will point to
190 // exactly the end of the file.
191 //
192 // After seeking:
193 //
194 //     ptr.extentIdx == len(filenode.extents) // i.e., at EOF
195 //     ||
196 //     filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
197 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
198         ptr = startPtr
199         if ptr.off < 0 {
200                 // meaningless anyway
201                 return
202         } else if ptr.off >= fn.fileinfo.size {
203                 ptr.extentIdx = len(fn.extents)
204                 ptr.extentOff = 0
205                 ptr.repacked = fn.repacked
206                 return
207         } else if ptr.repacked == fn.repacked {
208                 // extentIdx and extentOff accurately reflect ptr.off,
209                 // but might have fallen off the end of an extent
210                 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
211                         ptr.extentIdx++
212                         ptr.extentOff = 0
213                 }
214                 return
215         }
216         defer func() {
217                 ptr.repacked = fn.repacked
218         }()
219         if ptr.off >= fn.fileinfo.size {
220                 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
221                 return
222         }
223         // Recompute extentIdx and extentOff.  We have already
224         // established fn.fileinfo.size > ptr.off >= 0, so we don't
225         // have to deal with edge cases here.
226         var off int64
227         for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
228                 // This would panic (index out of range) if
229                 // fn.fileinfo.size were larger than
230                 // sum(fn.extents[i].Len()) -- but that can't happen
231                 // because we have ensured fn.fileinfo.size is always
232                 // accurate.
233                 extLen := int64(fn.extents[ptr.extentIdx].Len())
234                 if off+extLen > ptr.off {
235                         ptr.extentOff = int(ptr.off - off)
236                         break
237                 }
238                 off += extLen
239         }
240         return
241 }
242
243 func (fn *filenode) appendExtent(e extent) {
244         fn.Lock()
245         defer fn.Unlock()
246         fn.extents = append(fn.extents, e)
247         fn.fileinfo.size += int64(e.Len())
248 }
249
250 func (fn *filenode) Parent() inode {
251         return fn.parent
252 }
253
254 func (fn *filenode) Readdir() []os.FileInfo {
255         return nil
256 }
257
258 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
259         ptr = fn.seek(startPtr)
260         if ptr.off < 0 {
261                 err = ErrNegativeOffset
262                 return
263         }
264         if ptr.extentIdx >= len(fn.extents) {
265                 err = io.EOF
266                 return
267         }
268         n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
269         if n > 0 {
270                 ptr.off += int64(n)
271                 ptr.extentOff += n
272                 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
273                         ptr.extentIdx++
274                         ptr.extentOff = 0
275                         if ptr.extentIdx < len(fn.extents) && err == io.EOF {
276                                 err = nil
277                         }
278                 }
279         }
280         return
281 }
282
283 func (fn *filenode) Size() int64 {
284         fn.RLock()
285         defer fn.RUnlock()
286         return fn.fileinfo.Size()
287 }
288
289 func (fn *filenode) Stat() os.FileInfo {
290         fn.RLock()
291         defer fn.RUnlock()
292         return fn.fileinfo
293 }
294
295 func (fn *filenode) Truncate(size int64) error {
296         fn.Lock()
297         defer fn.Unlock()
298         return fn.truncate(size)
299 }
300
301 func (fn *filenode) truncate(size int64) error {
302         if size == fn.fileinfo.size {
303                 return nil
304         }
305         fn.repacked++
306         if size < fn.fileinfo.size {
307                 ptr := fn.seek(filenodePtr{off: size})
308                 for i := ptr.extentIdx; i < len(fn.extents); i++ {
309                         if ext, ok := fn.extents[i].(*memExtent); ok {
310                                 fn.memsize -= int64(ext.Len())
311                         }
312                 }
313                 if ptr.extentOff == 0 {
314                         fn.extents = fn.extents[:ptr.extentIdx]
315                 } else {
316                         fn.extents = fn.extents[:ptr.extentIdx+1]
317                         switch ext := fn.extents[ptr.extentIdx].(type) {
318                         case *memExtent:
319                                 ext.Truncate(ptr.extentOff)
320                                 fn.memsize += int64(ext.Len())
321                         default:
322                                 fn.extents[ptr.extentIdx] = ext.Slice(0, ptr.extentOff)
323                         }
324                 }
325                 fn.fileinfo.size = size
326                 return nil
327         }
328         for size > fn.fileinfo.size {
329                 grow := size - fn.fileinfo.size
330                 var e writableExtent
331                 var ok bool
332                 if len(fn.extents) == 0 {
333                         e = &memExtent{}
334                         fn.extents = append(fn.extents, e)
335                 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
336                         e = &memExtent{}
337                         fn.extents = append(fn.extents, e)
338                 }
339                 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
340                         grow = maxgrow
341                 }
342                 e.Truncate(e.Len() + int(grow))
343                 fn.fileinfo.size += grow
344                 fn.memsize += grow
345         }
346         return nil
347 }
348
349 // Caller must hold lock.
350 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
351         if startPtr.off > fn.fileinfo.size {
352                 if err = fn.truncate(startPtr.off); err != nil {
353                         return 0, startPtr, err
354                 }
355         }
356         ptr = fn.seek(startPtr)
357         if ptr.off < 0 {
358                 err = ErrNegativeOffset
359                 return
360         }
361         for len(p) > 0 && err == nil {
362                 cando := p
363                 if len(cando) > maxBlockSize {
364                         cando = cando[:maxBlockSize]
365                 }
366                 // Rearrange/grow fn.extents (and shrink cando if
367                 // needed) such that cando can be copied to
368                 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
369                 cur := ptr.extentIdx
370                 prev := ptr.extentIdx - 1
371                 var curWritable bool
372                 if cur < len(fn.extents) {
373                         _, curWritable = fn.extents[cur].(writableExtent)
374                 }
375                 var prevAppendable bool
376                 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
377                         _, prevAppendable = fn.extents[prev].(writableExtent)
378                 }
379                 if ptr.extentOff > 0 && !curWritable {
380                         // Split a non-writable block.
381                         if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
382                                 // Truncate cur, and insert a new
383                                 // extent after it.
384                                 cando = cando[:max]
385                                 fn.extents = append(fn.extents, nil)
386                                 copy(fn.extents[cur+1:], fn.extents[cur:])
387                         } else {
388                                 // Split cur into two copies, truncate
389                                 // the one on the left, shift the one
390                                 // on the right, and insert a new
391                                 // extent between them.
392                                 fn.extents = append(fn.extents, nil, nil)
393                                 copy(fn.extents[cur+2:], fn.extents[cur:])
394                                 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
395                         }
396                         cur++
397                         prev++
398                         e := &memExtent{}
399                         e.Truncate(len(cando))
400                         fn.memsize += int64(len(cando))
401                         fn.extents[cur] = e
402                         fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
403                         ptr.extentIdx++
404                         ptr.extentOff = 0
405                         fn.repacked++
406                         ptr.repacked++
407                 } else if curWritable {
408                         if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
409                                 cando = cando[:fit]
410                         }
411                 } else {
412                         if prevAppendable {
413                                 // Shrink cando if needed to fit in prev extent.
414                                 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
415                                         cando = cando[:cangrow]
416                                 }
417                         }
418
419                         if cur == len(fn.extents) {
420                                 // ptr is at EOF, filesize is changing.
421                                 fn.fileinfo.size += int64(len(cando))
422                         } else if el := fn.extents[cur].Len(); el <= len(cando) {
423                                 // cando is long enough that we won't
424                                 // need cur any more. shrink cando to
425                                 // be exactly as long as cur
426                                 // (otherwise we'd accidentally shift
427                                 // the effective position of all
428                                 // extents after cur).
429                                 cando = cando[:el]
430                                 copy(fn.extents[cur:], fn.extents[cur+1:])
431                                 fn.extents = fn.extents[:len(fn.extents)-1]
432                         } else {
433                                 // shrink cur by the same #bytes we're growing prev
434                                 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
435                         }
436
437                         if prevAppendable {
438                                 // Grow prev.
439                                 ptr.extentIdx--
440                                 ptr.extentOff = fn.extents[prev].Len()
441                                 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
442                                 fn.memsize += int64(len(cando))
443                                 ptr.repacked++
444                                 fn.repacked++
445                         } else {
446                                 // Insert an extent between prev and cur, and advance prev/cur.
447                                 fn.extents = append(fn.extents, nil)
448                                 if cur < len(fn.extents) {
449                                         copy(fn.extents[cur+1:], fn.extents[cur:])
450                                         ptr.repacked++
451                                         fn.repacked++
452                                 } else {
453                                         // appending a new extent does
454                                         // not invalidate any ptrs
455                                 }
456                                 e := &memExtent{}
457                                 e.Truncate(len(cando))
458                                 fn.memsize += int64(len(cando))
459                                 fn.extents[cur] = e
460                                 cur++
461                                 prev++
462                         }
463                 }
464
465                 // Finally we can copy bytes from cando to the current extent.
466                 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
467                 n += len(cando)
468                 p = p[len(cando):]
469
470                 ptr.off += int64(len(cando))
471                 ptr.extentOff += len(cando)
472                 if ptr.extentOff >= maxBlockSize {
473                         fn.pruneMemExtents()
474                 }
475                 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
476                         ptr.extentOff = 0
477                         ptr.extentIdx++
478                 }
479         }
480         return
481 }
482
483 // Write some data out to disk to reduce memory use. Caller must have
484 // write lock.
485 func (fn *filenode) pruneMemExtents() {
486         // TODO: async (don't hold Lock() while waiting for Keep)
487         // TODO: share code with (*dirnode)sync()
488         // TODO: pack/flush small blocks too, when fragmented
489         for idx, ext := range fn.extents {
490                 ext, ok := ext.(*memExtent)
491                 if !ok || ext.Len() < maxBlockSize {
492                         continue
493                 }
494                 locator, _, err := fn.parent.kc.PutB(ext.buf)
495                 if err != nil {
496                         // TODO: stall (or return errors from)
497                         // subsequent writes until flushing
498                         // starts to succeed
499                         continue
500                 }
501                 fn.memsize -= int64(ext.Len())
502                 fn.extents[idx] = storedExtent{
503                         kc:      fn.parent.kc,
504                         locator: locator,
505                         size:    ext.Len(),
506                         offset:  0,
507                         length:  ext.Len(),
508                 }
509         }
510 }
511
512 // FileSystem returns a CollectionFileSystem for the collection.
513 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
514         fs := &fileSystem{dirnode: dirnode{
515                 client:   client,
516                 kc:       kc,
517                 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
518                 parent:   nil,
519                 inodes:   make(map[string]inode),
520         }}
521         fs.dirnode.parent = &fs.dirnode
522         if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
523                 return nil, err
524         }
525         return fs, nil
526 }
527
528 type file struct {
529         inode
530         ptr        filenodePtr
531         append     bool
532         readable   bool
533         writable   bool
534         unreaddirs []os.FileInfo
535 }
536
537 func (f *file) Read(p []byte) (n int, err error) {
538         if !f.readable {
539                 return 0, ErrWriteOnlyMode
540         }
541         f.inode.RLock()
542         defer f.inode.RUnlock()
543         n, f.ptr, err = f.inode.Read(p, f.ptr)
544         return
545 }
546
547 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
548         size := f.inode.Size()
549         ptr := f.ptr
550         switch whence {
551         case os.SEEK_SET:
552                 ptr.off = off
553         case os.SEEK_CUR:
554                 ptr.off += off
555         case os.SEEK_END:
556                 ptr.off = size + off
557         }
558         if ptr.off < 0 {
559                 return f.ptr.off, ErrNegativeOffset
560         }
561         if ptr.off != f.ptr.off {
562                 f.ptr = ptr
563                 // force filenode to recompute f.ptr fields on next
564                 // use
565                 f.ptr.repacked = -1
566         }
567         return f.ptr.off, nil
568 }
569
570 func (f *file) Truncate(size int64) error {
571         return f.inode.Truncate(size)
572 }
573
574 func (f *file) Write(p []byte) (n int, err error) {
575         if !f.writable {
576                 return 0, ErrReadOnlyFile
577         }
578         f.inode.Lock()
579         defer f.inode.Unlock()
580         if fn, ok := f.inode.(*filenode); ok && f.append {
581                 f.ptr = filenodePtr{
582                         off:       fn.fileinfo.size,
583                         extentIdx: len(fn.extents),
584                         extentOff: 0,
585                         repacked:  fn.repacked,
586                 }
587         }
588         n, f.ptr, err = f.inode.Write(p, f.ptr)
589         return
590 }
591
592 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
593         if !f.inode.Stat().IsDir() {
594                 return nil, ErrInvalidOperation
595         }
596         if count <= 0 {
597                 return f.inode.Readdir(), nil
598         }
599         if f.unreaddirs == nil {
600                 f.unreaddirs = f.inode.Readdir()
601         }
602         if len(f.unreaddirs) == 0 {
603                 return nil, io.EOF
604         }
605         if count > len(f.unreaddirs) {
606                 count = len(f.unreaddirs)
607         }
608         ret := f.unreaddirs[:count]
609         f.unreaddirs = f.unreaddirs[count:]
610         return ret, nil
611 }
612
613 func (f *file) Stat() (os.FileInfo, error) {
614         return f.inode.Stat(), nil
615 }
616
617 func (f *file) Close() error {
618         // FIXME: flush
619         return nil
620 }
621
622 type dirnode struct {
623         fileinfo fileinfo
624         parent   *dirnode
625         client   *Client
626         kc       keepClient
627         inodes   map[string]inode
628         sync.RWMutex
629 }
630
631 // sync flushes in-memory data (for all files in the tree rooted at
632 // dn) to persistent storage. Caller must hold dn.Lock().
633 func (dn *dirnode) sync() error {
634         type shortBlock struct {
635                 fn  *filenode
636                 idx int
637         }
638         var pending []shortBlock
639         var pendingLen int
640
641         flush := func(sbs []shortBlock) error {
642                 if len(sbs) == 0 {
643                         return nil
644                 }
645                 block := make([]byte, 0, maxBlockSize)
646                 for _, sb := range sbs {
647                         block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
648                 }
649                 locator, _, err := dn.kc.PutB(block)
650                 if err != nil {
651                         return err
652                 }
653                 off := 0
654                 for _, sb := range sbs {
655                         data := sb.fn.extents[sb.idx].(*memExtent).buf
656                         sb.fn.extents[sb.idx] = storedExtent{
657                                 kc:      dn.kc,
658                                 locator: locator,
659                                 size:    len(block),
660                                 offset:  off,
661                                 length:  len(data),
662                         }
663                         off += len(data)
664                         sb.fn.memsize -= int64(len(data))
665                 }
666                 return nil
667         }
668
669         names := make([]string, 0, len(dn.inodes))
670         for name := range dn.inodes {
671                 names = append(names, name)
672         }
673         sort.Strings(names)
674
675         for _, name := range names {
676                 fn, ok := dn.inodes[name].(*filenode)
677                 if !ok {
678                         continue
679                 }
680                 fn.Lock()
681                 defer fn.Unlock()
682                 for idx, ext := range fn.extents {
683                         ext, ok := ext.(*memExtent)
684                         if !ok {
685                                 continue
686                         }
687                         if ext.Len() > maxBlockSize/2 {
688                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
689                                         return err
690                                 }
691                                 continue
692                         }
693                         if pendingLen+ext.Len() > maxBlockSize {
694                                 if err := flush(pending); err != nil {
695                                         return err
696                                 }
697                                 pending = nil
698                                 pendingLen = 0
699                         }
700                         pending = append(pending, shortBlock{fn, idx})
701                         pendingLen += ext.Len()
702                 }
703         }
704         return flush(pending)
705 }
706
707 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
708         dn.Lock()
709         defer dn.Unlock()
710         return dn.marshalManifest(prefix)
711 }
712
713 // caller must have read lock.
714 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
715         var streamLen int64
716         type m1segment struct {
717                 name   string
718                 offset int64
719                 length int64
720         }
721         var segments []m1segment
722         var subdirs string
723         var blocks []string
724
725         if err := dn.sync(); err != nil {
726                 return "", err
727         }
728
729         names := make([]string, 0, len(dn.inodes))
730         for name, node := range dn.inodes {
731                 names = append(names, name)
732                 node.Lock()
733                 defer node.Unlock()
734         }
735         sort.Strings(names)
736
737         for _, name := range names {
738                 node := dn.inodes[name]
739                 switch node := node.(type) {
740                 case *dirnode:
741                         subdir, err := node.marshalManifest(prefix + "/" + name)
742                         if err != nil {
743                                 return "", err
744                         }
745                         subdirs = subdirs + subdir
746                 case *filenode:
747                         if len(node.extents) == 0 {
748                                 segments = append(segments, m1segment{name: name})
749                                 break
750                         }
751                         for _, e := range node.extents {
752                                 switch e := e.(type) {
753                                 case storedExtent:
754                                         if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
755                                                 streamLen -= int64(e.size)
756                                         } else {
757                                                 blocks = append(blocks, e.locator)
758                                         }
759                                         next := m1segment{
760                                                 name:   name,
761                                                 offset: streamLen + int64(e.offset),
762                                                 length: int64(e.length),
763                                         }
764                                         if prev := len(segments) - 1; prev >= 0 &&
765                                                 segments[prev].name == name &&
766                                                 segments[prev].offset+segments[prev].length == next.offset {
767                                                 segments[prev].length += next.length
768                                         } else {
769                                                 segments = append(segments, next)
770                                         }
771                                         streamLen += int64(e.size)
772                                 default:
773                                         // This can't happen: we
774                                         // haven't unlocked since
775                                         // calling sync().
776                                         panic(fmt.Sprintf("can't marshal extent type %T", e))
777                                 }
778                         }
779                 default:
780                         panic(fmt.Sprintf("can't marshal inode type %T", node))
781                 }
782         }
783         var filetokens []string
784         for _, s := range segments {
785                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
786         }
787         if len(filetokens) == 0 {
788                 return subdirs, nil
789         } else if len(blocks) == 0 {
790                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
791         }
792         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
793 }
794
795 func (dn *dirnode) loadManifest(txt string) error {
796         // FIXME: faster
797         var dirname string
798         streams := strings.Split(txt, "\n")
799         if streams[len(streams)-1] != "" {
800                 return fmt.Errorf("line %d: no trailing newline", len(streams))
801         }
802         for i, stream := range streams[:len(streams)-1] {
803                 lineno := i + 1
804                 var extents []storedExtent
805                 var anyFileTokens bool
806                 var pos int64
807                 var extIdx int
808                 for i, token := range strings.Split(stream, " ") {
809                         if i == 0 {
810                                 dirname = manifestUnescape(token)
811                                 continue
812                         }
813                         if !strings.Contains(token, ":") {
814                                 if anyFileTokens {
815                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
816                                 }
817                                 toks := strings.SplitN(token, "+", 3)
818                                 if len(toks) < 2 {
819                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
820                                 }
821                                 length, err := strconv.ParseInt(toks[1], 10, 32)
822                                 if err != nil || length < 0 {
823                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
824                                 }
825                                 extents = append(extents, storedExtent{
826                                         locator: token,
827                                         size:    int(length),
828                                         offset:  0,
829                                         length:  int(length),
830                                 })
831                                 continue
832                         } else if len(extents) == 0 {
833                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
834                         }
835
836                         toks := strings.Split(token, ":")
837                         if len(toks) != 3 {
838                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
839                         }
840                         anyFileTokens = true
841
842                         offset, err := strconv.ParseInt(toks[0], 10, 64)
843                         if err != nil || offset < 0 {
844                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
845                         }
846                         length, err := strconv.ParseInt(toks[1], 10, 64)
847                         if err != nil || length < 0 {
848                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
849                         }
850                         name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
851                         fnode, err := dn.createFileAndParents(name)
852                         if err != nil {
853                                 return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
854                         }
855                         // Map the stream offset/range coordinates to
856                         // block/offset/range coordinates and add
857                         // corresponding storedExtents to the filenode
858                         if pos > offset {
859                                 // Can't continue where we left off.
860                                 // TODO: binary search instead of
861                                 // rewinding all the way (but this
862                                 // situation might be rare anyway)
863                                 extIdx, pos = 0, 0
864                         }
865                         for next := int64(0); extIdx < len(extents); extIdx, pos = extIdx+1, next {
866                                 e := extents[extIdx]
867                                 next = pos + int64(e.Len())
868                                 if next <= offset || e.Len() == 0 {
869                                         pos = next
870                                         continue
871                                 }
872                                 if pos >= offset+length {
873                                         break
874                                 }
875                                 var blkOff int
876                                 if pos < offset {
877                                         blkOff = int(offset - pos)
878                                 }
879                                 blkLen := e.Len() - blkOff
880                                 if pos+int64(blkOff+blkLen) > offset+length {
881                                         blkLen = int(offset + length - pos - int64(blkOff))
882                                 }
883                                 fnode.appendExtent(storedExtent{
884                                         kc:      dn.kc,
885                                         locator: e.locator,
886                                         size:    e.size,
887                                         offset:  blkOff,
888                                         length:  blkLen,
889                                 })
890                                 if next > offset+length {
891                                         break
892                                 }
893                         }
894                         if extIdx == len(extents) && pos < offset+length {
895                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
896                         }
897                 }
898                 if !anyFileTokens {
899                         return fmt.Errorf("line %d: no file segments", lineno)
900                 } else if len(extents) == 0 {
901                         return fmt.Errorf("line %d: no locators", lineno)
902                 } else if dirname == "" {
903                         return fmt.Errorf("line %d: no stream name", lineno)
904                 }
905         }
906         return nil
907 }
908
909 // only safe to call from loadManifest -- no locking
910 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
911         names := strings.Split(path, "/")
912         if basename := names[len(names)-1]; basename == "" || basename == "." || basename == ".." {
913                 err = fmt.Errorf("invalid filename")
914                 return
915         }
916         var node inode = dn
917         for i, name := range names {
918                 dn, ok := node.(*dirnode)
919                 if !ok {
920                         err = ErrFileExists
921                         return
922                 }
923                 if name == "" || name == "." {
924                         continue
925                 }
926                 if name == ".." {
927                         node = dn.parent
928                         continue
929                 }
930                 node, ok = dn.inodes[name]
931                 if !ok {
932                         if i == len(names)-1 {
933                                 fn = dn.newFilenode(name, 0755)
934                                 return
935                         }
936                         node = dn.newDirnode(name, 0755)
937                 }
938         }
939         var ok bool
940         if fn, ok = node.(*filenode); !ok {
941                 err = ErrInvalidArgument
942         }
943         return
944 }
945
946 func (dn *dirnode) mkdir(name string) (*file, error) {
947         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
948 }
949
950 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
951         f, err := dn.mkdir(name)
952         if err == nil {
953                 err = f.Close()
954         }
955         return err
956 }
957
958 func (dn *dirnode) Remove(name string) error {
959         return dn.remove(name, false)
960 }
961
962 func (dn *dirnode) RemoveAll(name string) error {
963         return dn.remove(name, true)
964 }
965
966 func (dn *dirnode) remove(name string, recursive bool) error {
967         dirname, name := path.Split(name)
968         if name == "" || name == "." || name == ".." {
969                 return ErrInvalidArgument
970         }
971         dn, ok := dn.lookupPath(dirname).(*dirnode)
972         if !ok {
973                 return os.ErrNotExist
974         }
975         dn.Lock()
976         defer dn.Unlock()
977         switch node := dn.inodes[name].(type) {
978         case nil:
979                 return os.ErrNotExist
980         case *dirnode:
981                 node.RLock()
982                 defer node.RUnlock()
983                 if !recursive && len(node.inodes) > 0 {
984                         return ErrDirectoryNotEmpty
985                 }
986         }
987         delete(dn.inodes, name)
988         return nil
989 }
990
991 func (dn *dirnode) Rename(oldname, newname string) error {
992         olddir, oldname := path.Split(oldname)
993         if oldname == "" || oldname == "." || oldname == ".." {
994                 return ErrInvalidArgument
995         }
996         olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
997         if err != nil {
998                 return fmt.Errorf("%q: %s", olddir, err)
999         }
1000         defer olddirf.Close()
1001         newdir, newname := path.Split(newname)
1002         if newname == "." || newname == ".." {
1003                 return ErrInvalidArgument
1004         } else if newname == "" {
1005                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
1006                 newname = oldname
1007         }
1008         newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
1009         if err != nil {
1010                 return fmt.Errorf("%q: %s", newdir, err)
1011         }
1012         defer newdirf.Close()
1013
1014         // When acquiring locks on multiple nodes, all common
1015         // ancestors must be locked first in order to avoid
1016         // deadlock. This is assured by locking the path from root to
1017         // newdir, then locking the path from root to olddir, skipping
1018         // any already-locked nodes.
1019         needLock := []sync.Locker{}
1020         for _, f := range []*file{olddirf, newdirf} {
1021                 node := f.inode
1022                 needLock = append(needLock, node)
1023                 for node.Parent() != node {
1024                         node = node.Parent()
1025                         needLock = append(needLock, node)
1026                 }
1027         }
1028         locked := map[sync.Locker]bool{}
1029         for i := len(needLock) - 1; i >= 0; i-- {
1030                 if n := needLock[i]; !locked[n] {
1031                         n.Lock()
1032                         defer n.Unlock()
1033                         locked[n] = true
1034                 }
1035         }
1036
1037         olddn := olddirf.inode.(*dirnode)
1038         newdn := newdirf.inode.(*dirnode)
1039         oldinode, ok := olddn.inodes[oldname]
1040         if !ok {
1041                 return os.ErrNotExist
1042         }
1043         if existing, ok := newdn.inodes[newname]; ok {
1044                 // overwriting an existing file or dir
1045                 if dn, ok := existing.(*dirnode); ok {
1046                         if !oldinode.Stat().IsDir() {
1047                                 return ErrIsDirectory
1048                         }
1049                         dn.RLock()
1050                         defer dn.RUnlock()
1051                         if len(dn.inodes) > 0 {
1052                                 return ErrDirectoryNotEmpty
1053                         }
1054                 }
1055         } else {
1056                 newdn.fileinfo.size++
1057         }
1058         newdn.inodes[newname] = oldinode
1059         delete(olddn.inodes, oldname)
1060         olddn.fileinfo.size--
1061         return nil
1062 }
1063
1064 func (dn *dirnode) Parent() inode {
1065         dn.RLock()
1066         defer dn.RUnlock()
1067         return dn.parent
1068 }
1069
1070 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
1071         dn.RLock()
1072         defer dn.RUnlock()
1073         fi = make([]os.FileInfo, 0, len(dn.inodes))
1074         for _, inode := range dn.inodes {
1075                 fi = append(fi, inode.Stat())
1076         }
1077         return
1078 }
1079
1080 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1081         return 0, ptr, ErrInvalidOperation
1082 }
1083
1084 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1085         return 0, ptr, ErrInvalidOperation
1086 }
1087
1088 func (dn *dirnode) Size() int64 {
1089         dn.RLock()
1090         defer dn.RUnlock()
1091         return dn.fileinfo.Size()
1092 }
1093
1094 func (dn *dirnode) Stat() os.FileInfo {
1095         dn.RLock()
1096         defer dn.RUnlock()
1097         return dn.fileinfo
1098 }
1099
1100 func (dn *dirnode) Truncate(int64) error {
1101         return ErrInvalidOperation
1102 }
1103
1104 // lookupPath returns the inode for the file/directory with the given
1105 // name (which may contain "/" separators), along with its parent
1106 // node. If no such file/directory exists, the returned node is nil.
1107 func (dn *dirnode) lookupPath(path string) (node inode) {
1108         node = dn
1109         for _, name := range strings.Split(path, "/") {
1110                 dn, ok := node.(*dirnode)
1111                 if !ok {
1112                         return nil
1113                 }
1114                 if name == "." || name == "" {
1115                         continue
1116                 }
1117                 if name == ".." {
1118                         node = node.Parent()
1119                         continue
1120                 }
1121                 dn.RLock()
1122                 node = dn.inodes[name]
1123                 dn.RUnlock()
1124         }
1125         return
1126 }
1127
1128 func (dn *dirnode) newDirnode(name string, perm os.FileMode) *dirnode {
1129         child := &dirnode{
1130                 parent: dn,
1131                 client: dn.client,
1132                 kc:     dn.kc,
1133                 fileinfo: fileinfo{
1134                         name: name,
1135                         mode: os.ModeDir | perm,
1136                 },
1137         }
1138         if dn.inodes == nil {
1139                 dn.inodes = make(map[string]inode)
1140         }
1141         dn.inodes[name] = child
1142         dn.fileinfo.size++
1143         return child
1144 }
1145
1146 func (dn *dirnode) newFilenode(name string, perm os.FileMode) *filenode {
1147         child := &filenode{
1148                 parent: dn,
1149                 fileinfo: fileinfo{
1150                         name: name,
1151                         mode: perm,
1152                 },
1153         }
1154         if dn.inodes == nil {
1155                 dn.inodes = make(map[string]inode)
1156         }
1157         dn.inodes[name] = child
1158         dn.fileinfo.size++
1159         return child
1160 }
1161
1162 // OpenFile is analogous to os.OpenFile().
1163 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
1164         if flag&os.O_SYNC != 0 {
1165                 return nil, ErrSyncNotSupported
1166         }
1167         dirname, name := path.Split(name)
1168         dn, ok := dn.lookupPath(dirname).(*dirnode)
1169         if !ok {
1170                 return nil, os.ErrNotExist
1171         }
1172         var readable, writable bool
1173         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
1174         case os.O_RDWR:
1175                 readable = true
1176                 writable = true
1177         case os.O_RDONLY:
1178                 readable = true
1179         case os.O_WRONLY:
1180                 writable = true
1181         default:
1182                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
1183         }
1184         if !writable {
1185                 // A directory can be opened via "foo/", "foo/.", or
1186                 // "foo/..".
1187                 switch name {
1188                 case ".", "":
1189                         return &file{inode: dn}, nil
1190                 case "..":
1191                         return &file{inode: dn.Parent()}, nil
1192                 }
1193         }
1194         createMode := flag&os.O_CREATE != 0
1195         if createMode {
1196                 dn.Lock()
1197                 defer dn.Unlock()
1198         } else {
1199                 dn.RLock()
1200                 defer dn.RUnlock()
1201         }
1202         n, ok := dn.inodes[name]
1203         if !ok {
1204                 if !createMode {
1205                         return nil, os.ErrNotExist
1206                 }
1207                 if perm.IsDir() {
1208                         n = dn.newDirnode(name, 0755)
1209                 } else {
1210                         n = dn.newFilenode(name, 0755)
1211                 }
1212         } else if flag&os.O_EXCL != 0 {
1213                 return nil, ErrFileExists
1214         } else if flag&os.O_TRUNC != 0 {
1215                 if !writable {
1216                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
1217                 } else if fn, ok := n.(*filenode); !ok {
1218                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
1219                 } else {
1220                         fn.Truncate(0)
1221                 }
1222         }
1223         return &file{
1224                 inode:    n,
1225                 append:   flag&os.O_APPEND != 0,
1226                 readable: readable,
1227                 writable: writable,
1228         }, nil
1229 }
1230
1231 type extent interface {
1232         io.ReaderAt
1233         Len() int
1234         // Return a new extent with a subsection of the data from this
1235         // one. length<0 means length=Len()-off.
1236         Slice(off int, length int) extent
1237 }
1238
1239 type writableExtent interface {
1240         extent
1241         WriteAt(p []byte, off int)
1242         Truncate(n int)
1243 }
1244
1245 type memExtent struct {
1246         buf []byte
1247 }
1248
1249 func (me *memExtent) Len() int {
1250         return len(me.buf)
1251 }
1252
1253 func (me *memExtent) Slice(off, length int) extent {
1254         if length < 0 {
1255                 length = len(me.buf) - off
1256         }
1257         buf := make([]byte, length)
1258         copy(buf, me.buf[off:])
1259         return &memExtent{buf: buf}
1260 }
1261
1262 func (me *memExtent) Truncate(n int) {
1263         if n > cap(me.buf) {
1264                 newsize := 1024
1265                 for newsize < n {
1266                         newsize = newsize << 2
1267                 }
1268                 newbuf := make([]byte, n, newsize)
1269                 copy(newbuf, me.buf)
1270                 me.buf = newbuf
1271         } else {
1272                 // Zero unused part when shrinking, in case we grow
1273                 // and start using it again later.
1274                 for i := n; i < len(me.buf); i++ {
1275                         me.buf[i] = 0
1276                 }
1277         }
1278         me.buf = me.buf[:n]
1279 }
1280
1281 func (me *memExtent) WriteAt(p []byte, off int) {
1282         if off+len(p) > len(me.buf) {
1283                 panic("overflowed extent")
1284         }
1285         copy(me.buf[off:], p)
1286 }
1287
1288 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1289         if off > int64(me.Len()) {
1290                 err = io.EOF
1291                 return
1292         }
1293         n = copy(p, me.buf[int(off):])
1294         if n < len(p) {
1295                 err = io.EOF
1296         }
1297         return
1298 }
1299
1300 type storedExtent struct {
1301         kc      keepClient
1302         locator string
1303         size    int
1304         offset  int
1305         length  int
1306 }
1307
1308 func (se storedExtent) Len() int {
1309         return se.length
1310 }
1311
1312 func (se storedExtent) Slice(n, size int) extent {
1313         se.offset += n
1314         se.length -= n
1315         if size >= 0 && se.length > size {
1316                 se.length = size
1317         }
1318         return se
1319 }
1320
1321 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1322         if off > int64(se.length) {
1323                 return 0, io.EOF
1324         }
1325         maxlen := se.length - int(off)
1326         if len(p) > maxlen {
1327                 p = p[:maxlen]
1328                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1329                 if err == nil {
1330                         err = io.EOF
1331                 }
1332                 return
1333         }
1334         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1335 }
1336
1337 func canonicalName(name string) string {
1338         name = path.Clean("/" + name)
1339         if name == "/" || name == "./" {
1340                 name = "."
1341         } else if strings.HasPrefix(name, "/") {
1342                 name = "." + name
1343         }
1344         return name
1345 }
1346
1347 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1348
1349 func manifestUnescapeFunc(seq string) string {
1350         if seq == `\\` {
1351                 return `\`
1352         }
1353         i, err := strconv.ParseUint(seq[1:], 8, 8)
1354         if err != nil {
1355                 // Invalid escape sequence: can't unescape.
1356                 return seq
1357         }
1358         return string([]byte{byte(i)})
1359 }
1360
1361 func manifestUnescape(s string) string {
1362         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1363 }
1364
1365 var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
1366
1367 func manifestEscapeFunc(seq string) string {
1368         return fmt.Sprintf("\\%03o", byte(seq[0]))
1369 }
1370
1371 func manifestEscape(s string) string {
1372         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1373 }