8311: Add git_tree mount type.
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrInvalidArgument   = errors.New("invalid argument")
28         ErrDirectoryNotEmpty = errors.New("directory not empty")
29         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
30         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
31         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
32         ErrPermission        = os.ErrPermission
33
34         maxBlockSize = 1 << 26
35 )
36
37 // A File is an *os.File-like interface for reading and writing files
38 // in a CollectionFileSystem.
39 type File interface {
40         io.Reader
41         io.Writer
42         io.Closer
43         io.Seeker
44         Size() int64
45         Readdir(int) ([]os.FileInfo, error)
46         Stat() (os.FileInfo, error)
47         Truncate(int64) error
48 }
49
50 type keepClient interface {
51         ReadAt(locator string, p []byte, off int) (int, error)
52         PutB(p []byte) (string, int, error)
53 }
54
55 type fileinfo struct {
56         name    string
57         mode    os.FileMode
58         size    int64
59         modTime time.Time
60 }
61
62 // Name implements os.FileInfo.
63 func (fi fileinfo) Name() string {
64         return fi.name
65 }
66
67 // ModTime implements os.FileInfo.
68 func (fi fileinfo) ModTime() time.Time {
69         return fi.modTime
70 }
71
72 // Mode implements os.FileInfo.
73 func (fi fileinfo) Mode() os.FileMode {
74         return fi.mode
75 }
76
77 // IsDir implements os.FileInfo.
78 func (fi fileinfo) IsDir() bool {
79         return fi.mode&os.ModeDir != 0
80 }
81
82 // Size implements os.FileInfo.
83 func (fi fileinfo) Size() int64 {
84         return fi.size
85 }
86
87 // Sys implements os.FileInfo.
88 func (fi fileinfo) Sys() interface{} {
89         return nil
90 }
91
92 // A CollectionFileSystem is an http.Filesystem plus Stat() and
93 // support for opening writable files. All methods are safe to call
94 // from multiple goroutines.
95 type CollectionFileSystem interface {
96         http.FileSystem
97
98         // analogous to os.Stat()
99         Stat(name string) (os.FileInfo, error)
100
101         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
102         Create(name string) (File, error)
103
104         // Like os.OpenFile(): create or open a file or directory.
105         //
106         // If flag&os.O_EXCL==0, it opens an existing file or
107         // directory if one exists. If flag&os.O_CREATE!=0, it creates
108         // a new empty file or directory if one does not already
109         // exist.
110         //
111         // When creating a new item, perm&os.ModeDir determines
112         // whether it is a file or a directory.
113         //
114         // A file can be opened multiple times and used concurrently
115         // from multiple goroutines. However, each File object should
116         // be used by only one goroutine at a time.
117         OpenFile(name string, flag int, perm os.FileMode) (File, error)
118
119         Mkdir(name string, perm os.FileMode) error
120         Remove(name string) error
121         RemoveAll(name string) error
122         Rename(oldname, newname string) error
123
124         // Flush all file data to Keep and return a snapshot of the
125         // filesystem suitable for saving as (Collection)ManifestText.
126         // Prefix (normally ".") is a top level directory, effectively
127         // prepended to all paths in the returned manifest.
128         MarshalManifest(prefix string) (string, error)
129 }
130
131 type fileSystem struct {
132         dirnode
133 }
134
135 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
136         return fs.dirnode.OpenFile(name, flag, perm)
137 }
138
139 func (fs *fileSystem) Open(name string) (http.File, error) {
140         return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
141 }
142
143 func (fs *fileSystem) Create(name string) (File, error) {
144         return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
145 }
146
147 func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
148         node := fs.dirnode.lookupPath(name)
149         if node == nil {
150                 err = os.ErrNotExist
151         } else {
152                 fi = node.Stat()
153         }
154         return
155 }
156
157 type inode interface {
158         Parent() inode
159         Read([]byte, filenodePtr) (int, filenodePtr, error)
160         Write([]byte, filenodePtr) (int, filenodePtr, error)
161         Truncate(int64) error
162         Readdir() []os.FileInfo
163         Size() int64
164         Stat() os.FileInfo
165         sync.Locker
166         RLock()
167         RUnlock()
168 }
169
170 // filenode implements inode.
171 type filenode struct {
172         fileinfo fileinfo
173         parent   *dirnode
174         segments []segment
175         // number of times `segments` has changed in a
176         // way that might invalidate a filenodePtr
177         repacked int64
178         memsize  int64 // bytes in memSegments
179         sync.RWMutex
180 }
181
182 // filenodePtr is an offset into a file that is (usually) efficient to
183 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
184 // then
185 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
186 // corresponds to file offset filenodePtr.off. Otherwise, it is
187 // necessary to reexamine len(filenode.segments[0]) etc. to find the
188 // correct segment and offset.
189 type filenodePtr struct {
190         off        int64
191         segmentIdx int
192         segmentOff int
193         repacked   int64
194 }
195
196 // seek returns a ptr that is consistent with both startPtr.off and
197 // the current state of fn. The caller must already hold fn.RLock() or
198 // fn.Lock().
199 //
200 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
201 // EOF.
202 //
203 // After seeking:
204 //
205 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
206 //     ||
207 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
208 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
209         ptr = startPtr
210         if ptr.off < 0 {
211                 // meaningless anyway
212                 return
213         } else if ptr.off >= fn.fileinfo.size {
214                 ptr.segmentIdx = len(fn.segments)
215                 ptr.segmentOff = 0
216                 ptr.repacked = fn.repacked
217                 return
218         } else if ptr.repacked == fn.repacked {
219                 // segmentIdx and segmentOff accurately reflect
220                 // ptr.off, but might have fallen off the end of a
221                 // segment
222                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
223                         ptr.segmentIdx++
224                         ptr.segmentOff = 0
225                 }
226                 return
227         }
228         defer func() {
229                 ptr.repacked = fn.repacked
230         }()
231         if ptr.off >= fn.fileinfo.size {
232                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
233                 return
234         }
235         // Recompute segmentIdx and segmentOff.  We have already
236         // established fn.fileinfo.size > ptr.off >= 0, so we don't
237         // have to deal with edge cases here.
238         var off int64
239         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
240                 // This would panic (index out of range) if
241                 // fn.fileinfo.size were larger than
242                 // sum(fn.segments[i].Len()) -- but that can't happen
243                 // because we have ensured fn.fileinfo.size is always
244                 // accurate.
245                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
246                 if off+segLen > ptr.off {
247                         ptr.segmentOff = int(ptr.off - off)
248                         break
249                 }
250                 off += segLen
251         }
252         return
253 }
254
255 // caller must have lock
256 func (fn *filenode) appendSegment(e segment) {
257         fn.segments = append(fn.segments, e)
258         fn.fileinfo.size += int64(e.Len())
259 }
260
261 func (fn *filenode) Parent() inode {
262         fn.RLock()
263         defer fn.RUnlock()
264         return fn.parent
265 }
266
267 func (fn *filenode) Readdir() []os.FileInfo {
268         return nil
269 }
270
271 // Read reads file data from a single segment, starting at startPtr,
272 // into p. startPtr is assumed not to be up-to-date. Caller must have
273 // RLock or Lock.
274 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
275         ptr = fn.seek(startPtr)
276         if ptr.off < 0 {
277                 err = ErrNegativeOffset
278                 return
279         }
280         if ptr.segmentIdx >= len(fn.segments) {
281                 err = io.EOF
282                 return
283         }
284         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
285         if n > 0 {
286                 ptr.off += int64(n)
287                 ptr.segmentOff += n
288                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
289                         ptr.segmentIdx++
290                         ptr.segmentOff = 0
291                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
292                                 err = nil
293                         }
294                 }
295         }
296         return
297 }
298
299 func (fn *filenode) Size() int64 {
300         fn.RLock()
301         defer fn.RUnlock()
302         return fn.fileinfo.Size()
303 }
304
305 func (fn *filenode) Stat() os.FileInfo {
306         fn.RLock()
307         defer fn.RUnlock()
308         return fn.fileinfo
309 }
310
311 func (fn *filenode) Truncate(size int64) error {
312         fn.Lock()
313         defer fn.Unlock()
314         return fn.truncate(size)
315 }
316
317 func (fn *filenode) truncate(size int64) error {
318         if size == fn.fileinfo.size {
319                 return nil
320         }
321         fn.repacked++
322         if size < fn.fileinfo.size {
323                 ptr := fn.seek(filenodePtr{off: size})
324                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
325                         if seg, ok := fn.segments[i].(*memSegment); ok {
326                                 fn.memsize -= int64(seg.Len())
327                         }
328                 }
329                 if ptr.segmentOff == 0 {
330                         fn.segments = fn.segments[:ptr.segmentIdx]
331                 } else {
332                         fn.segments = fn.segments[:ptr.segmentIdx+1]
333                         switch seg := fn.segments[ptr.segmentIdx].(type) {
334                         case *memSegment:
335                                 seg.Truncate(ptr.segmentOff)
336                                 fn.memsize += int64(seg.Len())
337                         default:
338                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
339                         }
340                 }
341                 fn.fileinfo.size = size
342                 return nil
343         }
344         for size > fn.fileinfo.size {
345                 grow := size - fn.fileinfo.size
346                 var seg *memSegment
347                 var ok bool
348                 if len(fn.segments) == 0 {
349                         seg = &memSegment{}
350                         fn.segments = append(fn.segments, seg)
351                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
352                         seg = &memSegment{}
353                         fn.segments = append(fn.segments, seg)
354                 }
355                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
356                         grow = maxgrow
357                 }
358                 seg.Truncate(seg.Len() + int(grow))
359                 fn.fileinfo.size += grow
360                 fn.memsize += grow
361         }
362         return nil
363 }
364
365 // Write writes data from p to the file, starting at startPtr,
366 // extending the file size if necessary. Caller must have Lock.
367 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
368         if startPtr.off > fn.fileinfo.size {
369                 if err = fn.truncate(startPtr.off); err != nil {
370                         return 0, startPtr, err
371                 }
372         }
373         ptr = fn.seek(startPtr)
374         if ptr.off < 0 {
375                 err = ErrNegativeOffset
376                 return
377         }
378         for len(p) > 0 && err == nil {
379                 cando := p
380                 if len(cando) > maxBlockSize {
381                         cando = cando[:maxBlockSize]
382                 }
383                 // Rearrange/grow fn.segments (and shrink cando if
384                 // needed) such that cando can be copied to
385                 // fn.segments[ptr.segmentIdx] at offset
386                 // ptr.segmentOff.
387                 cur := ptr.segmentIdx
388                 prev := ptr.segmentIdx - 1
389                 var curWritable bool
390                 if cur < len(fn.segments) {
391                         _, curWritable = fn.segments[cur].(*memSegment)
392                 }
393                 var prevAppendable bool
394                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
395                         _, prevAppendable = fn.segments[prev].(*memSegment)
396                 }
397                 if ptr.segmentOff > 0 && !curWritable {
398                         // Split a non-writable block.
399                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
400                                 // Truncate cur, and insert a new
401                                 // segment after it.
402                                 cando = cando[:max]
403                                 fn.segments = append(fn.segments, nil)
404                                 copy(fn.segments[cur+1:], fn.segments[cur:])
405                         } else {
406                                 // Split cur into two copies, truncate
407                                 // the one on the left, shift the one
408                                 // on the right, and insert a new
409                                 // segment between them.
410                                 fn.segments = append(fn.segments, nil, nil)
411                                 copy(fn.segments[cur+2:], fn.segments[cur:])
412                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
413                         }
414                         cur++
415                         prev++
416                         seg := &memSegment{}
417                         seg.Truncate(len(cando))
418                         fn.memsize += int64(len(cando))
419                         fn.segments[cur] = seg
420                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
421                         ptr.segmentIdx++
422                         ptr.segmentOff = 0
423                         fn.repacked++
424                         ptr.repacked++
425                 } else if curWritable {
426                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
427                                 cando = cando[:fit]
428                         }
429                 } else {
430                         if prevAppendable {
431                                 // Shrink cando if needed to fit in
432                                 // prev segment.
433                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
434                                         cando = cando[:cangrow]
435                                 }
436                         }
437
438                         if cur == len(fn.segments) {
439                                 // ptr is at EOF, filesize is changing.
440                                 fn.fileinfo.size += int64(len(cando))
441                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
442                                 // cando is long enough that we won't
443                                 // need cur any more. shrink cando to
444                                 // be exactly as long as cur
445                                 // (otherwise we'd accidentally shift
446                                 // the effective position of all
447                                 // segments after cur).
448                                 cando = cando[:el]
449                                 copy(fn.segments[cur:], fn.segments[cur+1:])
450                                 fn.segments = fn.segments[:len(fn.segments)-1]
451                         } else {
452                                 // shrink cur by the same #bytes we're growing prev
453                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
454                         }
455
456                         if prevAppendable {
457                                 // Grow prev.
458                                 ptr.segmentIdx--
459                                 ptr.segmentOff = fn.segments[prev].Len()
460                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
461                                 fn.memsize += int64(len(cando))
462                                 ptr.repacked++
463                                 fn.repacked++
464                         } else {
465                                 // Insert a segment between prev and
466                                 // cur, and advance prev/cur.
467                                 fn.segments = append(fn.segments, nil)
468                                 if cur < len(fn.segments) {
469                                         copy(fn.segments[cur+1:], fn.segments[cur:])
470                                         ptr.repacked++
471                                         fn.repacked++
472                                 } else {
473                                         // appending a new segment does
474                                         // not invalidate any ptrs
475                                 }
476                                 seg := &memSegment{}
477                                 seg.Truncate(len(cando))
478                                 fn.memsize += int64(len(cando))
479                                 fn.segments[cur] = seg
480                                 cur++
481                                 prev++
482                         }
483                 }
484
485                 // Finally we can copy bytes from cando to the current segment.
486                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
487                 n += len(cando)
488                 p = p[len(cando):]
489
490                 ptr.off += int64(len(cando))
491                 ptr.segmentOff += len(cando)
492                 if ptr.segmentOff >= maxBlockSize {
493                         fn.pruneMemSegments()
494                 }
495                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
496                         ptr.segmentOff = 0
497                         ptr.segmentIdx++
498                 }
499
500                 fn.fileinfo.modTime = time.Now()
501         }
502         return
503 }
504
505 // Write some data out to disk to reduce memory use. Caller must have
506 // write lock.
507 func (fn *filenode) pruneMemSegments() {
508         // TODO: async (don't hold Lock() while waiting for Keep)
509         // TODO: share code with (*dirnode)sync()
510         // TODO: pack/flush small blocks too, when fragmented
511         for idx, seg := range fn.segments {
512                 seg, ok := seg.(*memSegment)
513                 if !ok || seg.Len() < maxBlockSize {
514                         continue
515                 }
516                 locator, _, err := fn.parent.kc.PutB(seg.buf)
517                 if err != nil {
518                         // TODO: stall (or return errors from)
519                         // subsequent writes until flushing
520                         // starts to succeed
521                         continue
522                 }
523                 fn.memsize -= int64(seg.Len())
524                 fn.segments[idx] = storedSegment{
525                         kc:      fn.parent.kc,
526                         locator: locator,
527                         size:    seg.Len(),
528                         offset:  0,
529                         length:  seg.Len(),
530                 }
531         }
532 }
533
534 // FileSystem returns a CollectionFileSystem for the collection.
535 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
536         var modTime time.Time
537         if c.ModifiedAt == nil {
538                 modTime = time.Now()
539         } else {
540                 modTime = *c.ModifiedAt
541         }
542         fs := &fileSystem{dirnode: dirnode{
543                 client: client,
544                 kc:     kc,
545                 fileinfo: fileinfo{
546                         name:    ".",
547                         mode:    os.ModeDir | 0755,
548                         modTime: modTime,
549                 },
550                 parent: nil,
551                 inodes: make(map[string]inode),
552         }}
553         fs.dirnode.parent = &fs.dirnode
554         if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
555                 return nil, err
556         }
557         return fs, nil
558 }
559
560 type filehandle struct {
561         inode
562         ptr        filenodePtr
563         append     bool
564         readable   bool
565         writable   bool
566         unreaddirs []os.FileInfo
567 }
568
569 func (f *filehandle) Read(p []byte) (n int, err error) {
570         if !f.readable {
571                 return 0, ErrWriteOnlyMode
572         }
573         f.inode.RLock()
574         defer f.inode.RUnlock()
575         n, f.ptr, err = f.inode.Read(p, f.ptr)
576         return
577 }
578
579 func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
580         size := f.inode.Size()
581         ptr := f.ptr
582         switch whence {
583         case io.SeekStart:
584                 ptr.off = off
585         case io.SeekCurrent:
586                 ptr.off += off
587         case io.SeekEnd:
588                 ptr.off = size + off
589         }
590         if ptr.off < 0 {
591                 return f.ptr.off, ErrNegativeOffset
592         }
593         if ptr.off != f.ptr.off {
594                 f.ptr = ptr
595                 // force filenode to recompute f.ptr fields on next
596                 // use
597                 f.ptr.repacked = -1
598         }
599         return f.ptr.off, nil
600 }
601
602 func (f *filehandle) Truncate(size int64) error {
603         return f.inode.Truncate(size)
604 }
605
606 func (f *filehandle) Write(p []byte) (n int, err error) {
607         if !f.writable {
608                 return 0, ErrReadOnlyFile
609         }
610         f.inode.Lock()
611         defer f.inode.Unlock()
612         if fn, ok := f.inode.(*filenode); ok && f.append {
613                 f.ptr = filenodePtr{
614                         off:        fn.fileinfo.size,
615                         segmentIdx: len(fn.segments),
616                         segmentOff: 0,
617                         repacked:   fn.repacked,
618                 }
619         }
620         n, f.ptr, err = f.inode.Write(p, f.ptr)
621         return
622 }
623
624 func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
625         if !f.inode.Stat().IsDir() {
626                 return nil, ErrInvalidOperation
627         }
628         if count <= 0 {
629                 return f.inode.Readdir(), nil
630         }
631         if f.unreaddirs == nil {
632                 f.unreaddirs = f.inode.Readdir()
633         }
634         if len(f.unreaddirs) == 0 {
635                 return nil, io.EOF
636         }
637         if count > len(f.unreaddirs) {
638                 count = len(f.unreaddirs)
639         }
640         ret := f.unreaddirs[:count]
641         f.unreaddirs = f.unreaddirs[count:]
642         return ret, nil
643 }
644
645 func (f *filehandle) Stat() (os.FileInfo, error) {
646         return f.inode.Stat(), nil
647 }
648
649 func (f *filehandle) Close() error {
650         return nil
651 }
652
653 type dirnode struct {
654         fileinfo fileinfo
655         parent   *dirnode
656         client   *Client
657         kc       keepClient
658         inodes   map[string]inode
659         sync.RWMutex
660 }
661
662 // sync flushes in-memory data (for all files in the tree rooted at
663 // dn) to persistent storage. Caller must hold dn.Lock().
664 func (dn *dirnode) sync() error {
665         type shortBlock struct {
666                 fn  *filenode
667                 idx int
668         }
669         var pending []shortBlock
670         var pendingLen int
671
672         flush := func(sbs []shortBlock) error {
673                 if len(sbs) == 0 {
674                         return nil
675                 }
676                 block := make([]byte, 0, maxBlockSize)
677                 for _, sb := range sbs {
678                         block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
679                 }
680                 locator, _, err := dn.kc.PutB(block)
681                 if err != nil {
682                         return err
683                 }
684                 off := 0
685                 for _, sb := range sbs {
686                         data := sb.fn.segments[sb.idx].(*memSegment).buf
687                         sb.fn.segments[sb.idx] = storedSegment{
688                                 kc:      dn.kc,
689                                 locator: locator,
690                                 size:    len(block),
691                                 offset:  off,
692                                 length:  len(data),
693                         }
694                         off += len(data)
695                         sb.fn.memsize -= int64(len(data))
696                 }
697                 return nil
698         }
699
700         names := make([]string, 0, len(dn.inodes))
701         for name := range dn.inodes {
702                 names = append(names, name)
703         }
704         sort.Strings(names)
705
706         for _, name := range names {
707                 fn, ok := dn.inodes[name].(*filenode)
708                 if !ok {
709                         continue
710                 }
711                 fn.Lock()
712                 defer fn.Unlock()
713                 for idx, seg := range fn.segments {
714                         seg, ok := seg.(*memSegment)
715                         if !ok {
716                                 continue
717                         }
718                         if seg.Len() > maxBlockSize/2 {
719                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
720                                         return err
721                                 }
722                                 continue
723                         }
724                         if pendingLen+seg.Len() > maxBlockSize {
725                                 if err := flush(pending); err != nil {
726                                         return err
727                                 }
728                                 pending = nil
729                                 pendingLen = 0
730                         }
731                         pending = append(pending, shortBlock{fn, idx})
732                         pendingLen += seg.Len()
733                 }
734         }
735         return flush(pending)
736 }
737
738 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
739         dn.Lock()
740         defer dn.Unlock()
741         return dn.marshalManifest(prefix)
742 }
743
744 // caller must have read lock.
745 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
746         var streamLen int64
747         type filepart struct {
748                 name   string
749                 offset int64
750                 length int64
751         }
752         var fileparts []filepart
753         var subdirs string
754         var blocks []string
755
756         if err := dn.sync(); err != nil {
757                 return "", err
758         }
759
760         names := make([]string, 0, len(dn.inodes))
761         for name, node := range dn.inodes {
762                 names = append(names, name)
763                 node.Lock()
764                 defer node.Unlock()
765         }
766         sort.Strings(names)
767
768         for _, name := range names {
769                 switch node := dn.inodes[name].(type) {
770                 case *dirnode:
771                         subdir, err := node.marshalManifest(prefix + "/" + name)
772                         if err != nil {
773                                 return "", err
774                         }
775                         subdirs = subdirs + subdir
776                 case *filenode:
777                         if len(node.segments) == 0 {
778                                 fileparts = append(fileparts, filepart{name: name})
779                                 break
780                         }
781                         for _, seg := range node.segments {
782                                 switch seg := seg.(type) {
783                                 case storedSegment:
784                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
785                                                 streamLen -= int64(seg.size)
786                                         } else {
787                                                 blocks = append(blocks, seg.locator)
788                                         }
789                                         next := filepart{
790                                                 name:   name,
791                                                 offset: streamLen + int64(seg.offset),
792                                                 length: int64(seg.length),
793                                         }
794                                         if prev := len(fileparts) - 1; prev >= 0 &&
795                                                 fileparts[prev].name == name &&
796                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
797                                                 fileparts[prev].length += next.length
798                                         } else {
799                                                 fileparts = append(fileparts, next)
800                                         }
801                                         streamLen += int64(seg.size)
802                                 default:
803                                         // This can't happen: we
804                                         // haven't unlocked since
805                                         // calling sync().
806                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
807                                 }
808                         }
809                 default:
810                         panic(fmt.Sprintf("can't marshal inode type %T", node))
811                 }
812         }
813         var filetokens []string
814         for _, s := range fileparts {
815                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
816         }
817         if len(filetokens) == 0 {
818                 return subdirs, nil
819         } else if len(blocks) == 0 {
820                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
821         }
822         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
823 }
824
825 func (dn *dirnode) loadManifest(txt string) error {
826         var dirname string
827         streams := strings.Split(txt, "\n")
828         if streams[len(streams)-1] != "" {
829                 return fmt.Errorf("line %d: no trailing newline", len(streams))
830         }
831         streams = streams[:len(streams)-1]
832         segments := []storedSegment{}
833         for i, stream := range streams {
834                 lineno := i + 1
835                 var anyFileTokens bool
836                 var pos int64
837                 var segIdx int
838                 segments = segments[:0]
839                 for i, token := range strings.Split(stream, " ") {
840                         if i == 0 {
841                                 dirname = manifestUnescape(token)
842                                 continue
843                         }
844                         if !strings.Contains(token, ":") {
845                                 if anyFileTokens {
846                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
847                                 }
848                                 toks := strings.SplitN(token, "+", 3)
849                                 if len(toks) < 2 {
850                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
851                                 }
852                                 length, err := strconv.ParseInt(toks[1], 10, 32)
853                                 if err != nil || length < 0 {
854                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
855                                 }
856                                 segments = append(segments, storedSegment{
857                                         locator: token,
858                                         size:    int(length),
859                                         offset:  0,
860                                         length:  int(length),
861                                 })
862                                 continue
863                         } else if len(segments) == 0 {
864                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
865                         }
866
867                         toks := strings.SplitN(token, ":", 3)
868                         if len(toks) != 3 {
869                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
870                         }
871                         anyFileTokens = true
872
873                         offset, err := strconv.ParseInt(toks[0], 10, 64)
874                         if err != nil || offset < 0 {
875                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
876                         }
877                         length, err := strconv.ParseInt(toks[1], 10, 64)
878                         if err != nil || length < 0 {
879                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
880                         }
881                         name := dirname + "/" + manifestUnescape(toks[2])
882                         fnode, err := dn.createFileAndParents(name)
883                         if err != nil {
884                                 return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
885                         }
886                         // Map the stream offset/range coordinates to
887                         // block/offset/range coordinates and add
888                         // corresponding storedSegments to the filenode
889                         if pos > offset {
890                                 // Can't continue where we left off.
891                                 // TODO: binary search instead of
892                                 // rewinding all the way (but this
893                                 // situation might be rare anyway)
894                                 segIdx, pos = 0, 0
895                         }
896                         for next := int64(0); segIdx < len(segments); segIdx++ {
897                                 seg := segments[segIdx]
898                                 next = pos + int64(seg.Len())
899                                 if next <= offset || seg.Len() == 0 {
900                                         pos = next
901                                         continue
902                                 }
903                                 if pos >= offset+length {
904                                         break
905                                 }
906                                 var blkOff int
907                                 if pos < offset {
908                                         blkOff = int(offset - pos)
909                                 }
910                                 blkLen := seg.Len() - blkOff
911                                 if pos+int64(blkOff+blkLen) > offset+length {
912                                         blkLen = int(offset + length - pos - int64(blkOff))
913                                 }
914                                 fnode.appendSegment(storedSegment{
915                                         kc:      dn.kc,
916                                         locator: seg.locator,
917                                         size:    seg.size,
918                                         offset:  blkOff,
919                                         length:  blkLen,
920                                 })
921                                 if next > offset+length {
922                                         break
923                                 } else {
924                                         pos = next
925                                 }
926                         }
927                         if segIdx == len(segments) && pos < offset+length {
928                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
929                         }
930                 }
931                 if !anyFileTokens {
932                         return fmt.Errorf("line %d: no file segments", lineno)
933                 } else if len(segments) == 0 {
934                         return fmt.Errorf("line %d: no locators", lineno)
935                 } else if dirname == "" {
936                         return fmt.Errorf("line %d: no stream name", lineno)
937                 }
938         }
939         return nil
940 }
941
942 // only safe to call from loadManifest -- no locking
943 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
944         names := strings.Split(path, "/")
945         basename := names[len(names)-1]
946         if basename == "" || basename == "." || basename == ".." {
947                 err = fmt.Errorf("invalid filename")
948                 return
949         }
950         for _, name := range names[:len(names)-1] {
951                 switch name {
952                 case "", ".":
953                 case "..":
954                         dn = dn.parent
955                 default:
956                         switch node := dn.inodes[name].(type) {
957                         case nil:
958                                 dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime)
959                         case *dirnode:
960                                 dn = node
961                         case *filenode:
962                                 err = ErrFileExists
963                                 return
964                         }
965                 }
966         }
967         switch node := dn.inodes[basename].(type) {
968         case nil:
969                 fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime)
970         case *filenode:
971                 fn = node
972         case *dirnode:
973                 err = ErrIsDirectory
974         }
975         return
976 }
977
978 func (dn *dirnode) mkdir(name string) (*filehandle, error) {
979         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
980 }
981
982 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
983         f, err := dn.mkdir(name)
984         if err == nil {
985                 err = f.Close()
986         }
987         return err
988 }
989
990 func (dn *dirnode) Remove(name string) error {
991         return dn.remove(strings.TrimRight(name, "/"), false)
992 }
993
994 func (dn *dirnode) RemoveAll(name string) error {
995         err := dn.remove(strings.TrimRight(name, "/"), true)
996         if os.IsNotExist(err) {
997                 // "If the path does not exist, RemoveAll returns
998                 // nil." (see "os" pkg)
999                 err = nil
1000         }
1001         return err
1002 }
1003
1004 func (dn *dirnode) remove(name string, recursive bool) error {
1005         dirname, name := path.Split(name)
1006         if name == "" || name == "." || name == ".." {
1007                 return ErrInvalidArgument
1008         }
1009         dn, ok := dn.lookupPath(dirname).(*dirnode)
1010         if !ok {
1011                 return os.ErrNotExist
1012         }
1013         dn.Lock()
1014         defer dn.Unlock()
1015         switch node := dn.inodes[name].(type) {
1016         case nil:
1017                 return os.ErrNotExist
1018         case *dirnode:
1019                 node.RLock()
1020                 defer node.RUnlock()
1021                 if !recursive && len(node.inodes) > 0 {
1022                         return ErrDirectoryNotEmpty
1023                 }
1024         }
1025         delete(dn.inodes, name)
1026         return nil
1027 }
1028
1029 func (dn *dirnode) Rename(oldname, newname string) error {
1030         olddir, oldname := path.Split(oldname)
1031         if oldname == "" || oldname == "." || oldname == ".." {
1032                 return ErrInvalidArgument
1033         }
1034         olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
1035         if err != nil {
1036                 return fmt.Errorf("%q: %s", olddir, err)
1037         }
1038         defer olddirf.Close()
1039         newdir, newname := path.Split(newname)
1040         if newname == "." || newname == ".." {
1041                 return ErrInvalidArgument
1042         } else if newname == "" {
1043                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
1044                 newname = oldname
1045         }
1046         newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
1047         if err != nil {
1048                 return fmt.Errorf("%q: %s", newdir, err)
1049         }
1050         defer newdirf.Close()
1051
1052         // When acquiring locks on multiple nodes, all common
1053         // ancestors must be locked first in order to avoid
1054         // deadlock. This is assured by locking the path from root to
1055         // newdir, then locking the path from root to olddir, skipping
1056         // any already-locked nodes.
1057         needLock := []sync.Locker{}
1058         for _, f := range []*filehandle{olddirf, newdirf} {
1059                 node := f.inode
1060                 needLock = append(needLock, node)
1061                 for node.Parent() != node {
1062                         node = node.Parent()
1063                         needLock = append(needLock, node)
1064                 }
1065         }
1066         locked := map[sync.Locker]bool{}
1067         for i := len(needLock) - 1; i >= 0; i-- {
1068                 if n := needLock[i]; !locked[n] {
1069                         n.Lock()
1070                         defer n.Unlock()
1071                         locked[n] = true
1072                 }
1073         }
1074
1075         olddn := olddirf.inode.(*dirnode)
1076         newdn := newdirf.inode.(*dirnode)
1077         oldinode, ok := olddn.inodes[oldname]
1078         if !ok {
1079                 return os.ErrNotExist
1080         }
1081         if existing, ok := newdn.inodes[newname]; ok {
1082                 // overwriting an existing file or dir
1083                 if dn, ok := existing.(*dirnode); ok {
1084                         if !oldinode.Stat().IsDir() {
1085                                 return ErrIsDirectory
1086                         }
1087                         dn.RLock()
1088                         defer dn.RUnlock()
1089                         if len(dn.inodes) > 0 {
1090                                 return ErrDirectoryNotEmpty
1091                         }
1092                 }
1093         } else {
1094                 if newdn.inodes == nil {
1095                         newdn.inodes = make(map[string]inode)
1096                 }
1097                 newdn.fileinfo.size++
1098         }
1099         newdn.inodes[newname] = oldinode
1100         switch n := oldinode.(type) {
1101         case *dirnode:
1102                 n.parent = newdn
1103         case *filenode:
1104                 n.parent = newdn
1105         default:
1106                 panic(fmt.Sprintf("bad inode type %T", n))
1107         }
1108         delete(olddn.inodes, oldname)
1109         olddn.fileinfo.size--
1110         return nil
1111 }
1112
1113 func (dn *dirnode) Parent() inode {
1114         dn.RLock()
1115         defer dn.RUnlock()
1116         return dn.parent
1117 }
1118
1119 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
1120         dn.RLock()
1121         defer dn.RUnlock()
1122         fi = make([]os.FileInfo, 0, len(dn.inodes))
1123         for _, inode := range dn.inodes {
1124                 fi = append(fi, inode.Stat())
1125         }
1126         return
1127 }
1128
1129 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1130         return 0, ptr, ErrInvalidOperation
1131 }
1132
1133 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1134         return 0, ptr, ErrInvalidOperation
1135 }
1136
1137 func (dn *dirnode) Size() int64 {
1138         dn.RLock()
1139         defer dn.RUnlock()
1140         return dn.fileinfo.Size()
1141 }
1142
1143 func (dn *dirnode) Stat() os.FileInfo {
1144         dn.RLock()
1145         defer dn.RUnlock()
1146         return dn.fileinfo
1147 }
1148
1149 func (dn *dirnode) Truncate(int64) error {
1150         return ErrInvalidOperation
1151 }
1152
1153 // lookupPath returns the inode for the file/directory with the given
1154 // name (which may contain "/" separators), along with its parent
1155 // node. If no such file/directory exists, the returned node is nil.
1156 func (dn *dirnode) lookupPath(path string) (node inode) {
1157         node = dn
1158         for _, name := range strings.Split(path, "/") {
1159                 dn, ok := node.(*dirnode)
1160                 if !ok {
1161                         return nil
1162                 }
1163                 if name == "." || name == "" {
1164                         continue
1165                 }
1166                 if name == ".." {
1167                         node = node.Parent()
1168                         continue
1169                 }
1170                 dn.RLock()
1171                 node = dn.inodes[name]
1172                 dn.RUnlock()
1173         }
1174         return
1175 }
1176
1177 func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode {
1178         child := &dirnode{
1179                 parent: dn,
1180                 client: dn.client,
1181                 kc:     dn.kc,
1182                 fileinfo: fileinfo{
1183                         name:    name,
1184                         mode:    os.ModeDir | perm,
1185                         modTime: modTime,
1186                 },
1187         }
1188         if dn.inodes == nil {
1189                 dn.inodes = make(map[string]inode)
1190         }
1191         dn.inodes[name] = child
1192         dn.fileinfo.size++
1193         return child
1194 }
1195
1196 func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode {
1197         child := &filenode{
1198                 parent: dn,
1199                 fileinfo: fileinfo{
1200                         name:    name,
1201                         mode:    perm,
1202                         modTime: modTime,
1203                 },
1204         }
1205         if dn.inodes == nil {
1206                 dn.inodes = make(map[string]inode)
1207         }
1208         dn.inodes[name] = child
1209         dn.fileinfo.size++
1210         return child
1211 }
1212
1213 // OpenFile is analogous to os.OpenFile().
1214 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
1215         if flag&os.O_SYNC != 0 {
1216                 return nil, ErrSyncNotSupported
1217         }
1218         dirname, name := path.Split(name)
1219         dn, ok := dn.lookupPath(dirname).(*dirnode)
1220         if !ok {
1221                 return nil, os.ErrNotExist
1222         }
1223         var readable, writable bool
1224         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
1225         case os.O_RDWR:
1226                 readable = true
1227                 writable = true
1228         case os.O_RDONLY:
1229                 readable = true
1230         case os.O_WRONLY:
1231                 writable = true
1232         default:
1233                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
1234         }
1235         if !writable {
1236                 // A directory can be opened via "foo/", "foo/.", or
1237                 // "foo/..".
1238                 switch name {
1239                 case ".", "":
1240                         return &filehandle{inode: dn}, nil
1241                 case "..":
1242                         return &filehandle{inode: dn.Parent()}, nil
1243                 }
1244         }
1245         createMode := flag&os.O_CREATE != 0
1246         if createMode {
1247                 dn.Lock()
1248                 defer dn.Unlock()
1249         } else {
1250                 dn.RLock()
1251                 defer dn.RUnlock()
1252         }
1253         n, ok := dn.inodes[name]
1254         if !ok {
1255                 if !createMode {
1256                         return nil, os.ErrNotExist
1257                 }
1258                 if perm.IsDir() {
1259                         n = dn.newDirnode(name, 0755, time.Now())
1260                 } else {
1261                         n = dn.newFilenode(name, 0755, time.Now())
1262                 }
1263         } else if flag&os.O_EXCL != 0 {
1264                 return nil, ErrFileExists
1265         } else if flag&os.O_TRUNC != 0 {
1266                 if !writable {
1267                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
1268                 } else if fn, ok := n.(*filenode); !ok {
1269                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
1270                 } else {
1271                         fn.Truncate(0)
1272                 }
1273         }
1274         return &filehandle{
1275                 inode:    n,
1276                 append:   flag&os.O_APPEND != 0,
1277                 readable: readable,
1278                 writable: writable,
1279         }, nil
1280 }
1281
1282 type segment interface {
1283         io.ReaderAt
1284         Len() int
1285         // Return a new segment with a subsection of the data from this
1286         // one. length<0 means length=Len()-off.
1287         Slice(off int, length int) segment
1288 }
1289
1290 type memSegment struct {
1291         buf []byte
1292 }
1293
1294 func (me *memSegment) Len() int {
1295         return len(me.buf)
1296 }
1297
1298 func (me *memSegment) Slice(off, length int) segment {
1299         if length < 0 {
1300                 length = len(me.buf) - off
1301         }
1302         buf := make([]byte, length)
1303         copy(buf, me.buf[off:])
1304         return &memSegment{buf: buf}
1305 }
1306
1307 func (me *memSegment) Truncate(n int) {
1308         if n > cap(me.buf) {
1309                 newsize := 1024
1310                 for newsize < n {
1311                         newsize = newsize << 2
1312                 }
1313                 newbuf := make([]byte, n, newsize)
1314                 copy(newbuf, me.buf)
1315                 me.buf = newbuf
1316         } else {
1317                 // Zero unused part when shrinking, in case we grow
1318                 // and start using it again later.
1319                 for i := n; i < len(me.buf); i++ {
1320                         me.buf[i] = 0
1321                 }
1322         }
1323         me.buf = me.buf[:n]
1324 }
1325
1326 func (me *memSegment) WriteAt(p []byte, off int) {
1327         if off+len(p) > len(me.buf) {
1328                 panic("overflowed segment")
1329         }
1330         copy(me.buf[off:], p)
1331 }
1332
1333 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1334         if off > int64(me.Len()) {
1335                 err = io.EOF
1336                 return
1337         }
1338         n = copy(p, me.buf[int(off):])
1339         if n < len(p) {
1340                 err = io.EOF
1341         }
1342         return
1343 }
1344
1345 type storedSegment struct {
1346         kc      keepClient
1347         locator string
1348         size    int // size of stored block (also encoded in locator)
1349         offset  int // position of segment within the stored block
1350         length  int // bytes in this segment (offset + length <= size)
1351 }
1352
1353 func (se storedSegment) Len() int {
1354         return se.length
1355 }
1356
1357 func (se storedSegment) Slice(n, size int) segment {
1358         se.offset += n
1359         se.length -= n
1360         if size >= 0 && se.length > size {
1361                 se.length = size
1362         }
1363         return se
1364 }
1365
1366 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1367         if off > int64(se.length) {
1368                 return 0, io.EOF
1369         }
1370         maxlen := se.length - int(off)
1371         if len(p) > maxlen {
1372                 p = p[:maxlen]
1373                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1374                 if err == nil {
1375                         err = io.EOF
1376                 }
1377                 return
1378         }
1379         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1380 }
1381
1382 func canonicalName(name string) string {
1383         name = path.Clean("/" + name)
1384         if name == "/" || name == "./" {
1385                 name = "."
1386         } else if strings.HasPrefix(name, "/") {
1387                 name = "." + name
1388         }
1389         return name
1390 }
1391
1392 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1393
1394 func manifestUnescapeFunc(seq string) string {
1395         if seq == `\\` {
1396                 return `\`
1397         }
1398         i, err := strconv.ParseUint(seq[1:], 8, 8)
1399         if err != nil {
1400                 // Invalid escape sequence: can't unescape.
1401                 return seq
1402         }
1403         return string([]byte{byte(i)})
1404 }
1405
1406 func manifestUnescape(s string) string {
1407         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1408 }
1409
1410 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1411
1412 func manifestEscapeFunc(seq string) string {
1413         return fmt.Sprintf("\\%03o", byte(seq[0]))
1414 }
1415
1416 func manifestEscape(s string) string {
1417         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1418 }