Merge branch '11454-wb-federated-search'
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrInvalidArgument   = errors.New("invalid argument")
28         ErrDirectoryNotEmpty = errors.New("directory not empty")
29         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
30         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
31         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
32         ErrPermission        = os.ErrPermission
33
34         maxBlockSize = 1 << 26
35 )
36
37 // A File is an *os.File-like interface for reading and writing files
38 // in a CollectionFileSystem.
39 type File interface {
40         io.Reader
41         io.Writer
42         io.Closer
43         io.Seeker
44         Size() int64
45         Readdir(int) ([]os.FileInfo, error)
46         Stat() (os.FileInfo, error)
47         Truncate(int64) error
48 }
49
50 type keepClient interface {
51         ReadAt(locator string, p []byte, off int) (int, error)
52         PutB(p []byte) (string, int, error)
53 }
54
55 type fileinfo struct {
56         name    string
57         mode    os.FileMode
58         size    int64
59         modTime time.Time
60 }
61
62 // Name implements os.FileInfo.
63 func (fi fileinfo) Name() string {
64         return fi.name
65 }
66
67 // ModTime implements os.FileInfo.
68 func (fi fileinfo) ModTime() time.Time {
69         return fi.modTime
70 }
71
72 // Mode implements os.FileInfo.
73 func (fi fileinfo) Mode() os.FileMode {
74         return fi.mode
75 }
76
77 // IsDir implements os.FileInfo.
78 func (fi fileinfo) IsDir() bool {
79         return fi.mode&os.ModeDir != 0
80 }
81
82 // Size implements os.FileInfo.
83 func (fi fileinfo) Size() int64 {
84         return fi.size
85 }
86
87 // Sys implements os.FileInfo.
88 func (fi fileinfo) Sys() interface{} {
89         return nil
90 }
91
92 // A CollectionFileSystem is an http.Filesystem plus Stat() and
93 // support for opening writable files. All methods are safe to call
94 // from multiple goroutines.
95 type CollectionFileSystem interface {
96         http.FileSystem
97
98         // analogous to os.Stat()
99         Stat(name string) (os.FileInfo, error)
100
101         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
102         Create(name string) (File, error)
103
104         // Like os.OpenFile(): create or open a file or directory.
105         //
106         // If flag&os.O_EXCL==0, it opens an existing file or
107         // directory if one exists. If flag&os.O_CREATE!=0, it creates
108         // a new empty file or directory if one does not already
109         // exist.
110         //
111         // When creating a new item, perm&os.ModeDir determines
112         // whether it is a file or a directory.
113         //
114         // A file can be opened multiple times and used concurrently
115         // from multiple goroutines. However, each File object should
116         // be used by only one goroutine at a time.
117         OpenFile(name string, flag int, perm os.FileMode) (File, error)
118
119         Mkdir(name string, perm os.FileMode) error
120         Remove(name string) error
121         RemoveAll(name string) error
122         Rename(oldname, newname string) error
123
124         // Flush all file data to Keep and return a snapshot of the
125         // filesystem suitable for saving as (Collection)ManifestText.
126         // Prefix (normally ".") is a top level directory, effectively
127         // prepended to all paths in the returned manifest.
128         MarshalManifest(prefix string) (string, error)
129 }
130
131 type fileSystem struct {
132         dirnode
133 }
134
135 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
136         return fs.dirnode.OpenFile(name, flag, perm)
137 }
138
139 func (fs *fileSystem) Open(name string) (http.File, error) {
140         return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
141 }
142
143 func (fs *fileSystem) Create(name string) (File, error) {
144         return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
145 }
146
147 func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
148         node := fs.dirnode.lookupPath(name)
149         if node == nil {
150                 err = os.ErrNotExist
151         } else {
152                 fi = node.Stat()
153         }
154         return
155 }
156
157 type inode interface {
158         Parent() inode
159         Read([]byte, filenodePtr) (int, filenodePtr, error)
160         Write([]byte, filenodePtr) (int, filenodePtr, error)
161         Truncate(int64) error
162         Readdir() []os.FileInfo
163         Size() int64
164         Stat() os.FileInfo
165         sync.Locker
166         RLock()
167         RUnlock()
168 }
169
170 // filenode implements inode.
171 type filenode struct {
172         fileinfo fileinfo
173         parent   *dirnode
174         segments []segment
175         // number of times `segments` has changed in a
176         // way that might invalidate a filenodePtr
177         repacked int64
178         memsize  int64 // bytes in memSegments
179         sync.RWMutex
180 }
181
182 // filenodePtr is an offset into a file that is (usually) efficient to
183 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
184 // then
185 // filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
186 // corresponds to file offset filenodePtr.off. Otherwise, it is
187 // necessary to reexamine len(filenode.segments[0]) etc. to find the
188 // correct segment and offset.
189 type filenodePtr struct {
190         off        int64
191         segmentIdx int
192         segmentOff int
193         repacked   int64
194 }
195
196 // seek returns a ptr that is consistent with both startPtr.off and
197 // the current state of fn. The caller must already hold fn.RLock() or
198 // fn.Lock().
199 //
200 // If startPtr is beyond EOF, ptr.segment* will indicate precisely
201 // EOF.
202 //
203 // After seeking:
204 //
205 //     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
206 //     ||
207 //     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
208 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
209         ptr = startPtr
210         if ptr.off < 0 {
211                 // meaningless anyway
212                 return
213         } else if ptr.off >= fn.fileinfo.size {
214                 ptr.segmentIdx = len(fn.segments)
215                 ptr.segmentOff = 0
216                 ptr.repacked = fn.repacked
217                 return
218         } else if ptr.repacked == fn.repacked {
219                 // segmentIdx and segmentOff accurately reflect
220                 // ptr.off, but might have fallen off the end of a
221                 // segment
222                 if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
223                         ptr.segmentIdx++
224                         ptr.segmentOff = 0
225                 }
226                 return
227         }
228         defer func() {
229                 ptr.repacked = fn.repacked
230         }()
231         if ptr.off >= fn.fileinfo.size {
232                 ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
233                 return
234         }
235         // Recompute segmentIdx and segmentOff.  We have already
236         // established fn.fileinfo.size > ptr.off >= 0, so we don't
237         // have to deal with edge cases here.
238         var off int64
239         for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
240                 // This would panic (index out of range) if
241                 // fn.fileinfo.size were larger than
242                 // sum(fn.segments[i].Len()) -- but that can't happen
243                 // because we have ensured fn.fileinfo.size is always
244                 // accurate.
245                 segLen := int64(fn.segments[ptr.segmentIdx].Len())
246                 if off+segLen > ptr.off {
247                         ptr.segmentOff = int(ptr.off - off)
248                         break
249                 }
250                 off += segLen
251         }
252         return
253 }
254
255 // caller must have lock
256 func (fn *filenode) appendSegment(e segment) {
257         fn.segments = append(fn.segments, e)
258         fn.fileinfo.size += int64(e.Len())
259 }
260
261 func (fn *filenode) Parent() inode {
262         fn.RLock()
263         defer fn.RUnlock()
264         return fn.parent
265 }
266
267 func (fn *filenode) Readdir() []os.FileInfo {
268         return nil
269 }
270
271 // Read reads file data from a single segment, starting at startPtr,
272 // into p. startPtr is assumed not to be up-to-date. Caller must have
273 // RLock or Lock.
274 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
275         ptr = fn.seek(startPtr)
276         if ptr.off < 0 {
277                 err = ErrNegativeOffset
278                 return
279         }
280         if ptr.segmentIdx >= len(fn.segments) {
281                 err = io.EOF
282                 return
283         }
284         n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
285         if n > 0 {
286                 ptr.off += int64(n)
287                 ptr.segmentOff += n
288                 if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
289                         ptr.segmentIdx++
290                         ptr.segmentOff = 0
291                         if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
292                                 err = nil
293                         }
294                 }
295         }
296         return
297 }
298
299 func (fn *filenode) Size() int64 {
300         fn.RLock()
301         defer fn.RUnlock()
302         return fn.fileinfo.Size()
303 }
304
305 func (fn *filenode) Stat() os.FileInfo {
306         fn.RLock()
307         defer fn.RUnlock()
308         return fn.fileinfo
309 }
310
311 func (fn *filenode) Truncate(size int64) error {
312         fn.Lock()
313         defer fn.Unlock()
314         return fn.truncate(size)
315 }
316
317 func (fn *filenode) truncate(size int64) error {
318         if size == fn.fileinfo.size {
319                 return nil
320         }
321         fn.repacked++
322         if size < fn.fileinfo.size {
323                 ptr := fn.seek(filenodePtr{off: size})
324                 for i := ptr.segmentIdx; i < len(fn.segments); i++ {
325                         if seg, ok := fn.segments[i].(*memSegment); ok {
326                                 fn.memsize -= int64(seg.Len())
327                         }
328                 }
329                 if ptr.segmentOff == 0 {
330                         fn.segments = fn.segments[:ptr.segmentIdx]
331                 } else {
332                         fn.segments = fn.segments[:ptr.segmentIdx+1]
333                         switch seg := fn.segments[ptr.segmentIdx].(type) {
334                         case *memSegment:
335                                 seg.Truncate(ptr.segmentOff)
336                                 fn.memsize += int64(seg.Len())
337                         default:
338                                 fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
339                         }
340                 }
341                 fn.fileinfo.size = size
342                 return nil
343         }
344         for size > fn.fileinfo.size {
345                 grow := size - fn.fileinfo.size
346                 var seg *memSegment
347                 var ok bool
348                 if len(fn.segments) == 0 {
349                         seg = &memSegment{}
350                         fn.segments = append(fn.segments, seg)
351                 } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
352                         seg = &memSegment{}
353                         fn.segments = append(fn.segments, seg)
354                 }
355                 if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
356                         grow = maxgrow
357                 }
358                 seg.Truncate(seg.Len() + int(grow))
359                 fn.fileinfo.size += grow
360                 fn.memsize += grow
361         }
362         return nil
363 }
364
365 // Write writes data from p to the file, starting at startPtr,
366 // extending the file size if necessary. Caller must have Lock.
367 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
368         if startPtr.off > fn.fileinfo.size {
369                 if err = fn.truncate(startPtr.off); err != nil {
370                         return 0, startPtr, err
371                 }
372         }
373         ptr = fn.seek(startPtr)
374         if ptr.off < 0 {
375                 err = ErrNegativeOffset
376                 return
377         }
378         for len(p) > 0 && err == nil {
379                 cando := p
380                 if len(cando) > maxBlockSize {
381                         cando = cando[:maxBlockSize]
382                 }
383                 // Rearrange/grow fn.segments (and shrink cando if
384                 // needed) such that cando can be copied to
385                 // fn.segments[ptr.segmentIdx] at offset
386                 // ptr.segmentOff.
387                 cur := ptr.segmentIdx
388                 prev := ptr.segmentIdx - 1
389                 var curWritable bool
390                 if cur < len(fn.segments) {
391                         _, curWritable = fn.segments[cur].(*memSegment)
392                 }
393                 var prevAppendable bool
394                 if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
395                         _, prevAppendable = fn.segments[prev].(*memSegment)
396                 }
397                 if ptr.segmentOff > 0 && !curWritable {
398                         // Split a non-writable block.
399                         if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
400                                 // Truncate cur, and insert a new
401                                 // segment after it.
402                                 cando = cando[:max]
403                                 fn.segments = append(fn.segments, nil)
404                                 copy(fn.segments[cur+1:], fn.segments[cur:])
405                         } else {
406                                 // Split cur into two copies, truncate
407                                 // the one on the left, shift the one
408                                 // on the right, and insert a new
409                                 // segment between them.
410                                 fn.segments = append(fn.segments, nil, nil)
411                                 copy(fn.segments[cur+2:], fn.segments[cur:])
412                                 fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
413                         }
414                         cur++
415                         prev++
416                         seg := &memSegment{}
417                         seg.Truncate(len(cando))
418                         fn.memsize += int64(len(cando))
419                         fn.segments[cur] = seg
420                         fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
421                         ptr.segmentIdx++
422                         ptr.segmentOff = 0
423                         fn.repacked++
424                         ptr.repacked++
425                 } else if curWritable {
426                         if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
427                                 cando = cando[:fit]
428                         }
429                 } else {
430                         if prevAppendable {
431                                 // Shrink cando if needed to fit in
432                                 // prev segment.
433                                 if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
434                                         cando = cando[:cangrow]
435                                 }
436                         }
437
438                         if cur == len(fn.segments) {
439                                 // ptr is at EOF, filesize is changing.
440                                 fn.fileinfo.size += int64(len(cando))
441                         } else if el := fn.segments[cur].Len(); el <= len(cando) {
442                                 // cando is long enough that we won't
443                                 // need cur any more. shrink cando to
444                                 // be exactly as long as cur
445                                 // (otherwise we'd accidentally shift
446                                 // the effective position of all
447                                 // segments after cur).
448                                 cando = cando[:el]
449                                 copy(fn.segments[cur:], fn.segments[cur+1:])
450                                 fn.segments = fn.segments[:len(fn.segments)-1]
451                         } else {
452                                 // shrink cur by the same #bytes we're growing prev
453                                 fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
454                         }
455
456                         if prevAppendable {
457                                 // Grow prev.
458                                 ptr.segmentIdx--
459                                 ptr.segmentOff = fn.segments[prev].Len()
460                                 fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
461                                 fn.memsize += int64(len(cando))
462                                 ptr.repacked++
463                                 fn.repacked++
464                         } else {
465                                 // Insert a segment between prev and
466                                 // cur, and advance prev/cur.
467                                 fn.segments = append(fn.segments, nil)
468                                 if cur < len(fn.segments) {
469                                         copy(fn.segments[cur+1:], fn.segments[cur:])
470                                         ptr.repacked++
471                                         fn.repacked++
472                                 } else {
473                                         // appending a new segment does
474                                         // not invalidate any ptrs
475                                 }
476                                 seg := &memSegment{}
477                                 seg.Truncate(len(cando))
478                                 fn.memsize += int64(len(cando))
479                                 fn.segments[cur] = seg
480                                 cur++
481                                 prev++
482                         }
483                 }
484
485                 // Finally we can copy bytes from cando to the current segment.
486                 fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
487                 n += len(cando)
488                 p = p[len(cando):]
489
490                 ptr.off += int64(len(cando))
491                 ptr.segmentOff += len(cando)
492                 if ptr.segmentOff >= maxBlockSize {
493                         fn.pruneMemSegments()
494                 }
495                 if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
496                         ptr.segmentOff = 0
497                         ptr.segmentIdx++
498                 }
499
500                 fn.fileinfo.modTime = time.Now()
501         }
502         return
503 }
504
505 // Write some data out to disk to reduce memory use. Caller must have
506 // write lock.
507 func (fn *filenode) pruneMemSegments() {
508         // TODO: async (don't hold Lock() while waiting for Keep)
509         // TODO: share code with (*dirnode)sync()
510         // TODO: pack/flush small blocks too, when fragmented
511         for idx, seg := range fn.segments {
512                 seg, ok := seg.(*memSegment)
513                 if !ok || seg.Len() < maxBlockSize {
514                         continue
515                 }
516                 locator, _, err := fn.parent.kc.PutB(seg.buf)
517                 if err != nil {
518                         // TODO: stall (or return errors from)
519                         // subsequent writes until flushing
520                         // starts to succeed
521                         continue
522                 }
523                 fn.memsize -= int64(seg.Len())
524                 fn.segments[idx] = storedSegment{
525                         kc:      fn.parent.kc,
526                         locator: locator,
527                         size:    seg.Len(),
528                         offset:  0,
529                         length:  seg.Len(),
530                 }
531         }
532 }
533
534 // FileSystem returns a CollectionFileSystem for the collection.
535 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
536         var modTime time.Time
537         if c.ModifiedAt == nil {
538                 modTime = time.Now()
539         } else {
540                 modTime = *c.ModifiedAt
541         }
542         fs := &fileSystem{dirnode: dirnode{
543                 client: client,
544                 kc:     kc,
545                 fileinfo: fileinfo{
546                         name:    ".",
547                         mode:    os.ModeDir | 0755,
548                         modTime: modTime,
549                 },
550                 parent: nil,
551                 inodes: make(map[string]inode),
552         }}
553         fs.dirnode.parent = &fs.dirnode
554         if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
555                 return nil, err
556         }
557         return fs, nil
558 }
559
560 type filehandle struct {
561         inode
562         ptr        filenodePtr
563         append     bool
564         readable   bool
565         writable   bool
566         unreaddirs []os.FileInfo
567 }
568
569 func (f *filehandle) Read(p []byte) (n int, err error) {
570         if !f.readable {
571                 return 0, ErrWriteOnlyMode
572         }
573         f.inode.RLock()
574         defer f.inode.RUnlock()
575         n, f.ptr, err = f.inode.Read(p, f.ptr)
576         return
577 }
578
579 func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
580         size := f.inode.Size()
581         ptr := f.ptr
582         switch whence {
583         case io.SeekStart:
584                 ptr.off = off
585         case io.SeekCurrent:
586                 ptr.off += off
587         case io.SeekEnd:
588                 ptr.off = size + off
589         }
590         if ptr.off < 0 {
591                 return f.ptr.off, ErrNegativeOffset
592         }
593         if ptr.off != f.ptr.off {
594                 f.ptr = ptr
595                 // force filenode to recompute f.ptr fields on next
596                 // use
597                 f.ptr.repacked = -1
598         }
599         return f.ptr.off, nil
600 }
601
602 func (f *filehandle) Truncate(size int64) error {
603         return f.inode.Truncate(size)
604 }
605
606 func (f *filehandle) Write(p []byte) (n int, err error) {
607         if !f.writable {
608                 return 0, ErrReadOnlyFile
609         }
610         f.inode.Lock()
611         defer f.inode.Unlock()
612         if fn, ok := f.inode.(*filenode); ok && f.append {
613                 f.ptr = filenodePtr{
614                         off:        fn.fileinfo.size,
615                         segmentIdx: len(fn.segments),
616                         segmentOff: 0,
617                         repacked:   fn.repacked,
618                 }
619         }
620         n, f.ptr, err = f.inode.Write(p, f.ptr)
621         return
622 }
623
624 func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
625         if !f.inode.Stat().IsDir() {
626                 return nil, ErrInvalidOperation
627         }
628         if count <= 0 {
629                 return f.inode.Readdir(), nil
630         }
631         if f.unreaddirs == nil {
632                 f.unreaddirs = f.inode.Readdir()
633         }
634         if len(f.unreaddirs) == 0 {
635                 return nil, io.EOF
636         }
637         if count > len(f.unreaddirs) {
638                 count = len(f.unreaddirs)
639         }
640         ret := f.unreaddirs[:count]
641         f.unreaddirs = f.unreaddirs[count:]
642         return ret, nil
643 }
644
645 func (f *filehandle) Stat() (os.FileInfo, error) {
646         return f.inode.Stat(), nil
647 }
648
649 func (f *filehandle) Close() error {
650         return nil
651 }
652
653 type dirnode struct {
654         fileinfo fileinfo
655         parent   *dirnode
656         client   *Client
657         kc       keepClient
658         inodes   map[string]inode
659         sync.RWMutex
660 }
661
662 // sync flushes in-memory data (for all files in the tree rooted at
663 // dn) to persistent storage. Caller must hold dn.Lock().
664 func (dn *dirnode) sync() error {
665         type shortBlock struct {
666                 fn  *filenode
667                 idx int
668         }
669         var pending []shortBlock
670         var pendingLen int
671
672         flush := func(sbs []shortBlock) error {
673                 if len(sbs) == 0 {
674                         return nil
675                 }
676                 block := make([]byte, 0, maxBlockSize)
677                 for _, sb := range sbs {
678                         block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
679                 }
680                 locator, _, err := dn.kc.PutB(block)
681                 if err != nil {
682                         return err
683                 }
684                 off := 0
685                 for _, sb := range sbs {
686                         data := sb.fn.segments[sb.idx].(*memSegment).buf
687                         sb.fn.segments[sb.idx] = storedSegment{
688                                 kc:      dn.kc,
689                                 locator: locator,
690                                 size:    len(block),
691                                 offset:  off,
692                                 length:  len(data),
693                         }
694                         off += len(data)
695                         sb.fn.memsize -= int64(len(data))
696                 }
697                 return nil
698         }
699
700         names := make([]string, 0, len(dn.inodes))
701         for name := range dn.inodes {
702                 names = append(names, name)
703         }
704         sort.Strings(names)
705
706         for _, name := range names {
707                 fn, ok := dn.inodes[name].(*filenode)
708                 if !ok {
709                         continue
710                 }
711                 fn.Lock()
712                 defer fn.Unlock()
713                 for idx, seg := range fn.segments {
714                         seg, ok := seg.(*memSegment)
715                         if !ok {
716                                 continue
717                         }
718                         if seg.Len() > maxBlockSize/2 {
719                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
720                                         return err
721                                 }
722                                 continue
723                         }
724                         if pendingLen+seg.Len() > maxBlockSize {
725                                 if err := flush(pending); err != nil {
726                                         return err
727                                 }
728                                 pending = nil
729                                 pendingLen = 0
730                         }
731                         pending = append(pending, shortBlock{fn, idx})
732                         pendingLen += seg.Len()
733                 }
734         }
735         return flush(pending)
736 }
737
738 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
739         dn.Lock()
740         defer dn.Unlock()
741         return dn.marshalManifest(prefix)
742 }
743
744 // caller must have read lock.
745 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
746         var streamLen int64
747         type filepart struct {
748                 name   string
749                 offset int64
750                 length int64
751         }
752         var fileparts []filepart
753         var subdirs string
754         var blocks []string
755
756         if err := dn.sync(); err != nil {
757                 return "", err
758         }
759
760         names := make([]string, 0, len(dn.inodes))
761         for name, node := range dn.inodes {
762                 names = append(names, name)
763                 node.Lock()
764                 defer node.Unlock()
765         }
766         sort.Strings(names)
767
768         for _, name := range names {
769                 switch node := dn.inodes[name].(type) {
770                 case *dirnode:
771                         subdir, err := node.marshalManifest(prefix + "/" + name)
772                         if err != nil {
773                                 return "", err
774                         }
775                         subdirs = subdirs + subdir
776                 case *filenode:
777                         if len(node.segments) == 0 {
778                                 fileparts = append(fileparts, filepart{name: name})
779                                 break
780                         }
781                         for _, seg := range node.segments {
782                                 switch seg := seg.(type) {
783                                 case storedSegment:
784                                         if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
785                                                 streamLen -= int64(seg.size)
786                                         } else {
787                                                 blocks = append(blocks, seg.locator)
788                                         }
789                                         next := filepart{
790                                                 name:   name,
791                                                 offset: streamLen + int64(seg.offset),
792                                                 length: int64(seg.length),
793                                         }
794                                         if prev := len(fileparts) - 1; prev >= 0 &&
795                                                 fileparts[prev].name == name &&
796                                                 fileparts[prev].offset+fileparts[prev].length == next.offset {
797                                                 fileparts[prev].length += next.length
798                                         } else {
799                                                 fileparts = append(fileparts, next)
800                                         }
801                                         streamLen += int64(seg.size)
802                                 default:
803                                         // This can't happen: we
804                                         // haven't unlocked since
805                                         // calling sync().
806                                         panic(fmt.Sprintf("can't marshal segment type %T", seg))
807                                 }
808                         }
809                 default:
810                         panic(fmt.Sprintf("can't marshal inode type %T", node))
811                 }
812         }
813         var filetokens []string
814         for _, s := range fileparts {
815                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
816         }
817         if len(filetokens) == 0 {
818                 return subdirs, nil
819         } else if len(blocks) == 0 {
820                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
821         }
822         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
823 }
824
825 func (dn *dirnode) loadManifest(txt string) error {
826         var dirname string
827         streams := strings.Split(txt, "\n")
828         if streams[len(streams)-1] != "" {
829                 return fmt.Errorf("line %d: no trailing newline", len(streams))
830         }
831         streams = streams[:len(streams)-1]
832         segments := []storedSegment{}
833         for i, stream := range streams {
834                 lineno := i + 1
835                 var anyFileTokens bool
836                 var pos int64
837                 var segIdx int
838                 segments = segments[:0]
839                 for i, token := range strings.Split(stream, " ") {
840                         if i == 0 {
841                                 dirname = manifestUnescape(token)
842                                 continue
843                         }
844                         if !strings.Contains(token, ":") {
845                                 if anyFileTokens {
846                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
847                                 }
848                                 toks := strings.SplitN(token, "+", 3)
849                                 if len(toks) < 2 {
850                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
851                                 }
852                                 length, err := strconv.ParseInt(toks[1], 10, 32)
853                                 if err != nil || length < 0 {
854                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
855                                 }
856                                 segments = append(segments, storedSegment{
857                                         locator: token,
858                                         size:    int(length),
859                                         offset:  0,
860                                         length:  int(length),
861                                 })
862                                 continue
863                         } else if len(segments) == 0 {
864                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
865                         }
866
867                         toks := strings.SplitN(token, ":", 3)
868                         if len(toks) != 3 {
869                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
870                         }
871                         anyFileTokens = true
872
873                         offset, err := strconv.ParseInt(toks[0], 10, 64)
874                         if err != nil || offset < 0 {
875                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
876                         }
877                         length, err := strconv.ParseInt(toks[1], 10, 64)
878                         if err != nil || length < 0 {
879                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
880                         }
881                         name := dirname + "/" + manifestUnescape(toks[2])
882                         fnode, err := dn.createFileAndParents(name)
883                         if err != nil {
884                                 return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
885                         }
886                         // Map the stream offset/range coordinates to
887                         // block/offset/range coordinates and add
888                         // corresponding storedSegments to the filenode
889                         if pos > offset {
890                                 // Can't continue where we left off.
891                                 // TODO: binary search instead of
892                                 // rewinding all the way (but this
893                                 // situation might be rare anyway)
894                                 segIdx, pos = 0, 0
895                         }
896                         for next := int64(0); segIdx < len(segments); segIdx++ {
897                                 seg := segments[segIdx]
898                                 next = pos + int64(seg.Len())
899                                 if next <= offset || seg.Len() == 0 {
900                                         pos = next
901                                         continue
902                                 }
903                                 if pos >= offset+length {
904                                         break
905                                 }
906                                 var blkOff int
907                                 if pos < offset {
908                                         blkOff = int(offset - pos)
909                                 }
910                                 blkLen := seg.Len() - blkOff
911                                 if pos+int64(blkOff+blkLen) > offset+length {
912                                         blkLen = int(offset + length - pos - int64(blkOff))
913                                 }
914                                 fnode.appendSegment(storedSegment{
915                                         kc:      dn.kc,
916                                         locator: seg.locator,
917                                         size:    seg.size,
918                                         offset:  blkOff,
919                                         length:  blkLen,
920                                 })
921                                 if next > offset+length {
922                                         break
923                                 } else {
924                                         pos = next
925                                 }
926                         }
927                         if segIdx == len(segments) && pos < offset+length {
928                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
929                         }
930                 }
931                 if !anyFileTokens {
932                         return fmt.Errorf("line %d: no file segments", lineno)
933                 } else if len(segments) == 0 {
934                         return fmt.Errorf("line %d: no locators", lineno)
935                 } else if dirname == "" {
936                         return fmt.Errorf("line %d: no stream name", lineno)
937                 }
938         }
939         return nil
940 }
941
942 // only safe to call from loadManifest -- no locking
943 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
944         names := strings.Split(path, "/")
945         basename := names[len(names)-1]
946         if basename == "" || basename == "." || basename == ".." {
947                 err = fmt.Errorf("invalid filename")
948                 return
949         }
950         for _, name := range names[:len(names)-1] {
951                 switch name {
952                 case "", ".":
953                 case "..":
954                         dn = dn.parent
955                 default:
956                         switch node := dn.inodes[name].(type) {
957                         case nil:
958                                 dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime)
959                         case *dirnode:
960                                 dn = node
961                         case *filenode:
962                                 err = ErrFileExists
963                                 return
964                         }
965                 }
966         }
967         switch node := dn.inodes[basename].(type) {
968         case nil:
969                 fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime)
970         case *filenode:
971                 fn = node
972         case *dirnode:
973                 err = ErrIsDirectory
974         }
975         return
976 }
977
978 func (dn *dirnode) mkdir(name string) (*filehandle, error) {
979         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
980 }
981
982 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
983         f, err := dn.mkdir(name)
984         if err == nil {
985                 err = f.Close()
986         }
987         return err
988 }
989
990 func (dn *dirnode) Remove(name string) error {
991         return dn.remove(strings.TrimRight(name, "/"), false)
992 }
993
994 func (dn *dirnode) RemoveAll(name string) error {
995         err := dn.remove(strings.TrimRight(name, "/"), true)
996         if os.IsNotExist(err) {
997                 // "If the path does not exist, RemoveAll returns
998                 // nil." (see "os" pkg)
999                 err = nil
1000         }
1001         return err
1002 }
1003
1004 func (dn *dirnode) remove(name string, recursive bool) error {
1005         dirname, name := path.Split(name)
1006         if name == "" || name == "." || name == ".." {
1007                 return ErrInvalidArgument
1008         }
1009         dn, ok := dn.lookupPath(dirname).(*dirnode)
1010         if !ok {
1011                 return os.ErrNotExist
1012         }
1013         dn.Lock()
1014         defer dn.Unlock()
1015         switch node := dn.inodes[name].(type) {
1016         case nil:
1017                 return os.ErrNotExist
1018         case *dirnode:
1019                 node.RLock()
1020                 defer node.RUnlock()
1021                 if !recursive && len(node.inodes) > 0 {
1022                         return ErrDirectoryNotEmpty
1023                 }
1024         }
1025         delete(dn.inodes, name)
1026         return nil
1027 }
1028
1029 func (dn *dirnode) Rename(oldname, newname string) error {
1030         olddir, oldname := path.Split(oldname)
1031         if oldname == "" || oldname == "." || oldname == ".." {
1032                 return ErrInvalidArgument
1033         }
1034         olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
1035         if err != nil {
1036                 return fmt.Errorf("%q: %s", olddir, err)
1037         }
1038         defer olddirf.Close()
1039         newdir, newname := path.Split(newname)
1040         if newname == "." || newname == ".." {
1041                 return ErrInvalidArgument
1042         } else if newname == "" {
1043                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
1044                 newname = oldname
1045         }
1046         newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
1047         if err != nil {
1048                 return fmt.Errorf("%q: %s", newdir, err)
1049         }
1050         defer newdirf.Close()
1051
1052         // When acquiring locks on multiple nodes, all common
1053         // ancestors must be locked first in order to avoid
1054         // deadlock. This is assured by locking the path from root to
1055         // newdir, then locking the path from root to olddir, skipping
1056         // any already-locked nodes.
1057         needLock := []sync.Locker{}
1058         for _, f := range []*filehandle{olddirf, newdirf} {
1059                 node := f.inode
1060                 needLock = append(needLock, node)
1061                 for node.Parent() != node {
1062                         node = node.Parent()
1063                         needLock = append(needLock, node)
1064                 }
1065         }
1066         locked := map[sync.Locker]bool{}
1067         for i := len(needLock) - 1; i >= 0; i-- {
1068                 if n := needLock[i]; !locked[n] {
1069                         n.Lock()
1070                         defer n.Unlock()
1071                         locked[n] = true
1072                 }
1073         }
1074
1075         olddn := olddirf.inode.(*dirnode)
1076         newdn := newdirf.inode.(*dirnode)
1077         oldinode, ok := olddn.inodes[oldname]
1078         if !ok {
1079                 return os.ErrNotExist
1080         }
1081         if locked[oldinode] {
1082                 // oldinode cannot become a descendant of itself.
1083                 return ErrInvalidArgument
1084         }
1085         if existing, ok := newdn.inodes[newname]; ok {
1086                 // overwriting an existing file or dir
1087                 if dn, ok := existing.(*dirnode); ok {
1088                         if !oldinode.Stat().IsDir() {
1089                                 return ErrIsDirectory
1090                         }
1091                         dn.RLock()
1092                         defer dn.RUnlock()
1093                         if len(dn.inodes) > 0 {
1094                                 return ErrDirectoryNotEmpty
1095                         }
1096                 }
1097         } else {
1098                 if newdn.inodes == nil {
1099                         newdn.inodes = make(map[string]inode)
1100                 }
1101                 newdn.fileinfo.size++
1102         }
1103         newdn.inodes[newname] = oldinode
1104         switch n := oldinode.(type) {
1105         case *dirnode:
1106                 n.parent = newdn
1107         case *filenode:
1108                 n.parent = newdn
1109         default:
1110                 panic(fmt.Sprintf("bad inode type %T", n))
1111         }
1112         delete(olddn.inodes, oldname)
1113         olddn.fileinfo.size--
1114         return nil
1115 }
1116
1117 func (dn *dirnode) Parent() inode {
1118         dn.RLock()
1119         defer dn.RUnlock()
1120         return dn.parent
1121 }
1122
1123 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
1124         dn.RLock()
1125         defer dn.RUnlock()
1126         fi = make([]os.FileInfo, 0, len(dn.inodes))
1127         for _, inode := range dn.inodes {
1128                 fi = append(fi, inode.Stat())
1129         }
1130         return
1131 }
1132
1133 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1134         return 0, ptr, ErrInvalidOperation
1135 }
1136
1137 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1138         return 0, ptr, ErrInvalidOperation
1139 }
1140
1141 func (dn *dirnode) Size() int64 {
1142         dn.RLock()
1143         defer dn.RUnlock()
1144         return dn.fileinfo.Size()
1145 }
1146
1147 func (dn *dirnode) Stat() os.FileInfo {
1148         dn.RLock()
1149         defer dn.RUnlock()
1150         return dn.fileinfo
1151 }
1152
1153 func (dn *dirnode) Truncate(int64) error {
1154         return ErrInvalidOperation
1155 }
1156
1157 // lookupPath returns the inode for the file/directory with the given
1158 // name (which may contain "/" separators), along with its parent
1159 // node. If no such file/directory exists, the returned node is nil.
1160 func (dn *dirnode) lookupPath(path string) (node inode) {
1161         node = dn
1162         for _, name := range strings.Split(path, "/") {
1163                 dn, ok := node.(*dirnode)
1164                 if !ok {
1165                         return nil
1166                 }
1167                 if name == "." || name == "" {
1168                         continue
1169                 }
1170                 if name == ".." {
1171                         node = node.Parent()
1172                         continue
1173                 }
1174                 dn.RLock()
1175                 node = dn.inodes[name]
1176                 dn.RUnlock()
1177         }
1178         return
1179 }
1180
1181 func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode {
1182         child := &dirnode{
1183                 parent: dn,
1184                 client: dn.client,
1185                 kc:     dn.kc,
1186                 fileinfo: fileinfo{
1187                         name:    name,
1188                         mode:    os.ModeDir | perm,
1189                         modTime: modTime,
1190                 },
1191         }
1192         if dn.inodes == nil {
1193                 dn.inodes = make(map[string]inode)
1194         }
1195         dn.inodes[name] = child
1196         dn.fileinfo.size++
1197         return child
1198 }
1199
1200 func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode {
1201         child := &filenode{
1202                 parent: dn,
1203                 fileinfo: fileinfo{
1204                         name:    name,
1205                         mode:    perm,
1206                         modTime: modTime,
1207                 },
1208         }
1209         if dn.inodes == nil {
1210                 dn.inodes = make(map[string]inode)
1211         }
1212         dn.inodes[name] = child
1213         dn.fileinfo.size++
1214         return child
1215 }
1216
1217 // OpenFile is analogous to os.OpenFile().
1218 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
1219         if flag&os.O_SYNC != 0 {
1220                 return nil, ErrSyncNotSupported
1221         }
1222         dirname, name := path.Split(name)
1223         dn, ok := dn.lookupPath(dirname).(*dirnode)
1224         if !ok {
1225                 return nil, os.ErrNotExist
1226         }
1227         var readable, writable bool
1228         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
1229         case os.O_RDWR:
1230                 readable = true
1231                 writable = true
1232         case os.O_RDONLY:
1233                 readable = true
1234         case os.O_WRONLY:
1235                 writable = true
1236         default:
1237                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
1238         }
1239         if !writable {
1240                 // A directory can be opened via "foo/", "foo/.", or
1241                 // "foo/..".
1242                 switch name {
1243                 case ".", "":
1244                         return &filehandle{inode: dn}, nil
1245                 case "..":
1246                         return &filehandle{inode: dn.Parent()}, nil
1247                 }
1248         }
1249         createMode := flag&os.O_CREATE != 0
1250         if createMode {
1251                 dn.Lock()
1252                 defer dn.Unlock()
1253         } else {
1254                 dn.RLock()
1255                 defer dn.RUnlock()
1256         }
1257         n, ok := dn.inodes[name]
1258         if !ok {
1259                 if !createMode {
1260                         return nil, os.ErrNotExist
1261                 }
1262                 if perm.IsDir() {
1263                         n = dn.newDirnode(name, 0755, time.Now())
1264                 } else {
1265                         n = dn.newFilenode(name, 0755, time.Now())
1266                 }
1267         } else if flag&os.O_EXCL != 0 {
1268                 return nil, ErrFileExists
1269         } else if flag&os.O_TRUNC != 0 {
1270                 if !writable {
1271                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
1272                 } else if fn, ok := n.(*filenode); !ok {
1273                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
1274                 } else {
1275                         fn.Truncate(0)
1276                 }
1277         }
1278         return &filehandle{
1279                 inode:    n,
1280                 append:   flag&os.O_APPEND != 0,
1281                 readable: readable,
1282                 writable: writable,
1283         }, nil
1284 }
1285
1286 type segment interface {
1287         io.ReaderAt
1288         Len() int
1289         // Return a new segment with a subsection of the data from this
1290         // one. length<0 means length=Len()-off.
1291         Slice(off int, length int) segment
1292 }
1293
1294 type memSegment struct {
1295         buf []byte
1296 }
1297
1298 func (me *memSegment) Len() int {
1299         return len(me.buf)
1300 }
1301
1302 func (me *memSegment) Slice(off, length int) segment {
1303         if length < 0 {
1304                 length = len(me.buf) - off
1305         }
1306         buf := make([]byte, length)
1307         copy(buf, me.buf[off:])
1308         return &memSegment{buf: buf}
1309 }
1310
1311 func (me *memSegment) Truncate(n int) {
1312         if n > cap(me.buf) {
1313                 newsize := 1024
1314                 for newsize < n {
1315                         newsize = newsize << 2
1316                 }
1317                 newbuf := make([]byte, n, newsize)
1318                 copy(newbuf, me.buf)
1319                 me.buf = newbuf
1320         } else {
1321                 // Zero unused part when shrinking, in case we grow
1322                 // and start using it again later.
1323                 for i := n; i < len(me.buf); i++ {
1324                         me.buf[i] = 0
1325                 }
1326         }
1327         me.buf = me.buf[:n]
1328 }
1329
1330 func (me *memSegment) WriteAt(p []byte, off int) {
1331         if off+len(p) > len(me.buf) {
1332                 panic("overflowed segment")
1333         }
1334         copy(me.buf[off:], p)
1335 }
1336
1337 func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
1338         if off > int64(me.Len()) {
1339                 err = io.EOF
1340                 return
1341         }
1342         n = copy(p, me.buf[int(off):])
1343         if n < len(p) {
1344                 err = io.EOF
1345         }
1346         return
1347 }
1348
1349 type storedSegment struct {
1350         kc      keepClient
1351         locator string
1352         size    int // size of stored block (also encoded in locator)
1353         offset  int // position of segment within the stored block
1354         length  int // bytes in this segment (offset + length <= size)
1355 }
1356
1357 func (se storedSegment) Len() int {
1358         return se.length
1359 }
1360
1361 func (se storedSegment) Slice(n, size int) segment {
1362         se.offset += n
1363         se.length -= n
1364         if size >= 0 && se.length > size {
1365                 se.length = size
1366         }
1367         return se
1368 }
1369
1370 func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
1371         if off > int64(se.length) {
1372                 return 0, io.EOF
1373         }
1374         maxlen := se.length - int(off)
1375         if len(p) > maxlen {
1376                 p = p[:maxlen]
1377                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1378                 if err == nil {
1379                         err = io.EOF
1380                 }
1381                 return
1382         }
1383         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1384 }
1385
1386 func canonicalName(name string) string {
1387         name = path.Clean("/" + name)
1388         if name == "/" || name == "./" {
1389                 name = "."
1390         } else if strings.HasPrefix(name, "/") {
1391                 name = "." + name
1392         }
1393         return name
1394 }
1395
1396 var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
1397
1398 func manifestUnescapeFunc(seq string) string {
1399         if seq == `\\` {
1400                 return `\`
1401         }
1402         i, err := strconv.ParseUint(seq[1:], 8, 8)
1403         if err != nil {
1404                 // Invalid escape sequence: can't unescape.
1405                 return seq
1406         }
1407         return string([]byte{byte(i)})
1408 }
1409
1410 func manifestUnescape(s string) string {
1411         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1412 }
1413
1414 var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
1415
1416 func manifestEscapeFunc(seq string) string {
1417         return fmt.Sprintf("\\%03o", byte(seq[0]))
1418 }
1419
1420 func manifestEscape(s string) string {
1421         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1422 }