4c97af5712c34dccd409150107224799f1bf1d0b
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrInvalidArgument   = errors.New("invalid argument")
28         ErrDirectoryNotEmpty = errors.New("directory not empty")
29         ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
30         ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
31         ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
32         ErrPermission        = os.ErrPermission
33
34         maxBlockSize = 1 << 26
35 )
36
37 type File interface {
38         io.Reader
39         io.Writer
40         io.Closer
41         io.Seeker
42         Size() int64
43         Readdir(int) ([]os.FileInfo, error)
44         Stat() (os.FileInfo, error)
45         Truncate(int64) error
46 }
47
48 type keepClient interface {
49         ReadAt(locator string, p []byte, off int) (int, error)
50         PutB(p []byte) (string, int, error)
51 }
52
53 type fileinfo struct {
54         name    string
55         mode    os.FileMode
56         size    int64
57         modTime time.Time
58 }
59
60 // Name implements os.FileInfo.
61 func (fi fileinfo) Name() string {
62         return fi.name
63 }
64
65 // ModTime implements os.FileInfo.
66 func (fi fileinfo) ModTime() time.Time {
67         return fi.modTime
68 }
69
70 // Mode implements os.FileInfo.
71 func (fi fileinfo) Mode() os.FileMode {
72         return fi.mode
73 }
74
75 // IsDir implements os.FileInfo.
76 func (fi fileinfo) IsDir() bool {
77         return fi.mode&os.ModeDir != 0
78 }
79
80 // Size implements os.FileInfo.
81 func (fi fileinfo) Size() int64 {
82         return fi.size
83 }
84
85 // Sys implements os.FileInfo.
86 func (fi fileinfo) Sys() interface{} {
87         return nil
88 }
89
90 // A CollectionFileSystem is an http.Filesystem plus Stat() and
91 // support for opening writable files. All methods are safe to call
92 // from multiple goroutines.
93 type CollectionFileSystem interface {
94         http.FileSystem
95
96         // analogous to os.Stat()
97         Stat(name string) (os.FileInfo, error)
98
99         // analogous to os.Create(): create/truncate a file and open it O_RDWR.
100         Create(name string) (File, error)
101
102         // Like os.OpenFile(): create or open a file or directory.
103         //
104         // If flag&os.O_EXCL==0, it opens an existing file or
105         // directory if one exists. If flag&os.O_CREATE!=0, it creates
106         // a new empty file or directory if one does not already
107         // exist.
108         //
109         // When creating a new item, perm&os.ModeDir determines
110         // whether it is a file or a directory.
111         //
112         // A file can be opened multiple times and used concurrently
113         // from multiple goroutines. However, each File object should
114         // be used by only one goroutine at a time.
115         OpenFile(name string, flag int, perm os.FileMode) (File, error)
116
117         Mkdir(name string, perm os.FileMode) error
118         Remove(name string) error
119         RemoveAll(name string) error
120         Rename(oldname, newname string) error
121         MarshalManifest(prefix string) (string, error)
122 }
123
124 type fileSystem struct {
125         dirnode
126 }
127
128 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
129         return fs.dirnode.OpenFile(name, flag, perm)
130 }
131
132 func (fs *fileSystem) Open(name string) (http.File, error) {
133         return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
134 }
135
136 func (fs *fileSystem) Create(name string) (File, error) {
137         return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
138 }
139
140 func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
141         node := fs.dirnode.lookupPath(name)
142         if node == nil {
143                 err = os.ErrNotExist
144         } else {
145                 fi = node.Stat()
146         }
147         return
148 }
149
150 type inode interface {
151         Parent() inode
152         Read([]byte, filenodePtr) (int, filenodePtr, error)
153         Write([]byte, filenodePtr) (int, filenodePtr, error)
154         Truncate(int64) error
155         Readdir() []os.FileInfo
156         Size() int64
157         Stat() os.FileInfo
158         sync.Locker
159         RLock()
160         RUnlock()
161 }
162
163 // filenode implements inode.
164 type filenode struct {
165         fileinfo fileinfo
166         parent   *dirnode
167         extents  []extent
168         repacked int64 // number of times anything in []extents has changed len
169         memsize  int64 // bytes in memExtents
170         sync.RWMutex
171 }
172
173 // filenodePtr is an offset into a file that is (usually) efficient to
174 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
175 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
176 // corresponds to file offset filenodePtr.off. Otherwise, it is
177 // necessary to reexamine len(filenode.extents[0]) etc. to find the
178 // correct extent and offset.
179 type filenodePtr struct {
180         off       int64
181         extentIdx int
182         extentOff int
183         repacked  int64
184 }
185
186 // seek returns a ptr that is consistent with both startPtr.off and
187 // the current state of fn. The caller must already hold fn.RLock() or
188 // fn.Lock().
189 //
190 // If startPtr points beyond the end of the file, ptr will point to
191 // exactly the end of the file.
192 //
193 // After seeking:
194 //
195 //     ptr.extentIdx == len(filenode.extents) // i.e., at EOF
196 //     ||
197 //     filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
198 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
199         ptr = startPtr
200         if ptr.off < 0 {
201                 // meaningless anyway
202                 return
203         } else if ptr.off >= fn.fileinfo.size {
204                 ptr.extentIdx = len(fn.extents)
205                 ptr.extentOff = 0
206                 ptr.repacked = fn.repacked
207                 return
208         } else if ptr.repacked == fn.repacked {
209                 // extentIdx and extentOff accurately reflect ptr.off,
210                 // but might have fallen off the end of an extent
211                 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
212                         ptr.extentIdx++
213                         ptr.extentOff = 0
214                 }
215                 return
216         }
217         defer func() {
218                 ptr.repacked = fn.repacked
219         }()
220         if ptr.off >= fn.fileinfo.size {
221                 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
222                 return
223         }
224         // Recompute extentIdx and extentOff.  We have already
225         // established fn.fileinfo.size > ptr.off >= 0, so we don't
226         // have to deal with edge cases here.
227         var off int64
228         for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
229                 // This would panic (index out of range) if
230                 // fn.fileinfo.size were larger than
231                 // sum(fn.extents[i].Len()) -- but that can't happen
232                 // because we have ensured fn.fileinfo.size is always
233                 // accurate.
234                 extLen := int64(fn.extents[ptr.extentIdx].Len())
235                 if off+extLen > ptr.off {
236                         ptr.extentOff = int(ptr.off - off)
237                         break
238                 }
239                 off += extLen
240         }
241         return
242 }
243
244 // caller must have lock
245 func (fn *filenode) appendExtent(e extent) {
246         fn.extents = append(fn.extents, e)
247         fn.fileinfo.size += int64(e.Len())
248 }
249
250 func (fn *filenode) Parent() inode {
251         return fn.parent
252 }
253
254 func (fn *filenode) Readdir() []os.FileInfo {
255         return nil
256 }
257
258 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
259         ptr = fn.seek(startPtr)
260         if ptr.off < 0 {
261                 err = ErrNegativeOffset
262                 return
263         }
264         if ptr.extentIdx >= len(fn.extents) {
265                 err = io.EOF
266                 return
267         }
268         n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
269         if n > 0 {
270                 ptr.off += int64(n)
271                 ptr.extentOff += n
272                 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
273                         ptr.extentIdx++
274                         ptr.extentOff = 0
275                         if ptr.extentIdx < len(fn.extents) && err == io.EOF {
276                                 err = nil
277                         }
278                 }
279         }
280         return
281 }
282
283 func (fn *filenode) Size() int64 {
284         fn.RLock()
285         defer fn.RUnlock()
286         return fn.fileinfo.Size()
287 }
288
289 func (fn *filenode) Stat() os.FileInfo {
290         fn.RLock()
291         defer fn.RUnlock()
292         return fn.fileinfo
293 }
294
295 func (fn *filenode) Truncate(size int64) error {
296         fn.Lock()
297         defer fn.Unlock()
298         return fn.truncate(size)
299 }
300
301 func (fn *filenode) truncate(size int64) error {
302         if size == fn.fileinfo.size {
303                 return nil
304         }
305         fn.repacked++
306         if size < fn.fileinfo.size {
307                 ptr := fn.seek(filenodePtr{off: size})
308                 for i := ptr.extentIdx; i < len(fn.extents); i++ {
309                         if ext, ok := fn.extents[i].(*memExtent); ok {
310                                 fn.memsize -= int64(ext.Len())
311                         }
312                 }
313                 if ptr.extentOff == 0 {
314                         fn.extents = fn.extents[:ptr.extentIdx]
315                 } else {
316                         fn.extents = fn.extents[:ptr.extentIdx+1]
317                         switch ext := fn.extents[ptr.extentIdx].(type) {
318                         case *memExtent:
319                                 ext.Truncate(ptr.extentOff)
320                                 fn.memsize += int64(ext.Len())
321                         default:
322                                 fn.extents[ptr.extentIdx] = ext.Slice(0, ptr.extentOff)
323                         }
324                 }
325                 fn.fileinfo.size = size
326                 return nil
327         }
328         for size > fn.fileinfo.size {
329                 grow := size - fn.fileinfo.size
330                 var e writableExtent
331                 var ok bool
332                 if len(fn.extents) == 0 {
333                         e = &memExtent{}
334                         fn.extents = append(fn.extents, e)
335                 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
336                         e = &memExtent{}
337                         fn.extents = append(fn.extents, e)
338                 }
339                 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
340                         grow = maxgrow
341                 }
342                 e.Truncate(e.Len() + int(grow))
343                 fn.fileinfo.size += grow
344                 fn.memsize += grow
345         }
346         return nil
347 }
348
349 // Caller must hold lock.
350 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
351         if startPtr.off > fn.fileinfo.size {
352                 if err = fn.truncate(startPtr.off); err != nil {
353                         return 0, startPtr, err
354                 }
355         }
356         ptr = fn.seek(startPtr)
357         if ptr.off < 0 {
358                 err = ErrNegativeOffset
359                 return
360         }
361         for len(p) > 0 && err == nil {
362                 cando := p
363                 if len(cando) > maxBlockSize {
364                         cando = cando[:maxBlockSize]
365                 }
366                 // Rearrange/grow fn.extents (and shrink cando if
367                 // needed) such that cando can be copied to
368                 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
369                 cur := ptr.extentIdx
370                 prev := ptr.extentIdx - 1
371                 var curWritable bool
372                 if cur < len(fn.extents) {
373                         _, curWritable = fn.extents[cur].(writableExtent)
374                 }
375                 var prevAppendable bool
376                 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
377                         _, prevAppendable = fn.extents[prev].(writableExtent)
378                 }
379                 if ptr.extentOff > 0 && !curWritable {
380                         // Split a non-writable block.
381                         if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
382                                 // Truncate cur, and insert a new
383                                 // extent after it.
384                                 cando = cando[:max]
385                                 fn.extents = append(fn.extents, nil)
386                                 copy(fn.extents[cur+1:], fn.extents[cur:])
387                         } else {
388                                 // Split cur into two copies, truncate
389                                 // the one on the left, shift the one
390                                 // on the right, and insert a new
391                                 // extent between them.
392                                 fn.extents = append(fn.extents, nil, nil)
393                                 copy(fn.extents[cur+2:], fn.extents[cur:])
394                                 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
395                         }
396                         cur++
397                         prev++
398                         e := &memExtent{}
399                         e.Truncate(len(cando))
400                         fn.memsize += int64(len(cando))
401                         fn.extents[cur] = e
402                         fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
403                         ptr.extentIdx++
404                         ptr.extentOff = 0
405                         fn.repacked++
406                         ptr.repacked++
407                 } else if curWritable {
408                         if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
409                                 cando = cando[:fit]
410                         }
411                 } else {
412                         if prevAppendable {
413                                 // Shrink cando if needed to fit in prev extent.
414                                 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
415                                         cando = cando[:cangrow]
416                                 }
417                         }
418
419                         if cur == len(fn.extents) {
420                                 // ptr is at EOF, filesize is changing.
421                                 fn.fileinfo.size += int64(len(cando))
422                         } else if el := fn.extents[cur].Len(); el <= len(cando) {
423                                 // cando is long enough that we won't
424                                 // need cur any more. shrink cando to
425                                 // be exactly as long as cur
426                                 // (otherwise we'd accidentally shift
427                                 // the effective position of all
428                                 // extents after cur).
429                                 cando = cando[:el]
430                                 copy(fn.extents[cur:], fn.extents[cur+1:])
431                                 fn.extents = fn.extents[:len(fn.extents)-1]
432                         } else {
433                                 // shrink cur by the same #bytes we're growing prev
434                                 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
435                         }
436
437                         if prevAppendable {
438                                 // Grow prev.
439                                 ptr.extentIdx--
440                                 ptr.extentOff = fn.extents[prev].Len()
441                                 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
442                                 fn.memsize += int64(len(cando))
443                                 ptr.repacked++
444                                 fn.repacked++
445                         } else {
446                                 // Insert an extent between prev and cur, and advance prev/cur.
447                                 fn.extents = append(fn.extents, nil)
448                                 if cur < len(fn.extents) {
449                                         copy(fn.extents[cur+1:], fn.extents[cur:])
450                                         ptr.repacked++
451                                         fn.repacked++
452                                 } else {
453                                         // appending a new extent does
454                                         // not invalidate any ptrs
455                                 }
456                                 e := &memExtent{}
457                                 e.Truncate(len(cando))
458                                 fn.memsize += int64(len(cando))
459                                 fn.extents[cur] = e
460                                 cur++
461                                 prev++
462                         }
463                 }
464
465                 // Finally we can copy bytes from cando to the current extent.
466                 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
467                 n += len(cando)
468                 p = p[len(cando):]
469
470                 ptr.off += int64(len(cando))
471                 ptr.extentOff += len(cando)
472                 if ptr.extentOff >= maxBlockSize {
473                         fn.pruneMemExtents()
474                 }
475                 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
476                         ptr.extentOff = 0
477                         ptr.extentIdx++
478                 }
479         }
480         return
481 }
482
483 // Write some data out to disk to reduce memory use. Caller must have
484 // write lock.
485 func (fn *filenode) pruneMemExtents() {
486         // TODO: async (don't hold Lock() while waiting for Keep)
487         // TODO: share code with (*dirnode)sync()
488         // TODO: pack/flush small blocks too, when fragmented
489         for idx, ext := range fn.extents {
490                 ext, ok := ext.(*memExtent)
491                 if !ok || ext.Len() < maxBlockSize {
492                         continue
493                 }
494                 locator, _, err := fn.parent.kc.PutB(ext.buf)
495                 if err != nil {
496                         // TODO: stall (or return errors from)
497                         // subsequent writes until flushing
498                         // starts to succeed
499                         continue
500                 }
501                 fn.memsize -= int64(ext.Len())
502                 fn.extents[idx] = storedExtent{
503                         kc:      fn.parent.kc,
504                         locator: locator,
505                         size:    ext.Len(),
506                         offset:  0,
507                         length:  ext.Len(),
508                 }
509         }
510 }
511
512 // FileSystem returns a CollectionFileSystem for the collection.
513 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
514         fs := &fileSystem{dirnode: dirnode{
515                 client:   client,
516                 kc:       kc,
517                 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
518                 parent:   nil,
519                 inodes:   make(map[string]inode),
520         }}
521         fs.dirnode.parent = &fs.dirnode
522         if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
523                 return nil, err
524         }
525         return fs, nil
526 }
527
528 type file struct {
529         inode
530         ptr        filenodePtr
531         append     bool
532         readable   bool
533         writable   bool
534         unreaddirs []os.FileInfo
535 }
536
537 func (f *file) Read(p []byte) (n int, err error) {
538         if !f.readable {
539                 return 0, ErrWriteOnlyMode
540         }
541         f.inode.RLock()
542         defer f.inode.RUnlock()
543         n, f.ptr, err = f.inode.Read(p, f.ptr)
544         return
545 }
546
547 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
548         size := f.inode.Size()
549         ptr := f.ptr
550         switch whence {
551         case os.SEEK_SET:
552                 ptr.off = off
553         case os.SEEK_CUR:
554                 ptr.off += off
555         case os.SEEK_END:
556                 ptr.off = size + off
557         }
558         if ptr.off < 0 {
559                 return f.ptr.off, ErrNegativeOffset
560         }
561         if ptr.off != f.ptr.off {
562                 f.ptr = ptr
563                 // force filenode to recompute f.ptr fields on next
564                 // use
565                 f.ptr.repacked = -1
566         }
567         return f.ptr.off, nil
568 }
569
570 func (f *file) Truncate(size int64) error {
571         return f.inode.Truncate(size)
572 }
573
574 func (f *file) Write(p []byte) (n int, err error) {
575         if !f.writable {
576                 return 0, ErrReadOnlyFile
577         }
578         f.inode.Lock()
579         defer f.inode.Unlock()
580         if fn, ok := f.inode.(*filenode); ok && f.append {
581                 f.ptr = filenodePtr{
582                         off:       fn.fileinfo.size,
583                         extentIdx: len(fn.extents),
584                         extentOff: 0,
585                         repacked:  fn.repacked,
586                 }
587         }
588         n, f.ptr, err = f.inode.Write(p, f.ptr)
589         return
590 }
591
592 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
593         if !f.inode.Stat().IsDir() {
594                 return nil, ErrInvalidOperation
595         }
596         if count <= 0 {
597                 return f.inode.Readdir(), nil
598         }
599         if f.unreaddirs == nil {
600                 f.unreaddirs = f.inode.Readdir()
601         }
602         if len(f.unreaddirs) == 0 {
603                 return nil, io.EOF
604         }
605         if count > len(f.unreaddirs) {
606                 count = len(f.unreaddirs)
607         }
608         ret := f.unreaddirs[:count]
609         f.unreaddirs = f.unreaddirs[count:]
610         return ret, nil
611 }
612
613 func (f *file) Stat() (os.FileInfo, error) {
614         return f.inode.Stat(), nil
615 }
616
617 func (f *file) Close() error {
618         // FIXME: flush
619         return nil
620 }
621
622 type dirnode struct {
623         fileinfo fileinfo
624         parent   *dirnode
625         client   *Client
626         kc       keepClient
627         inodes   map[string]inode
628         sync.RWMutex
629 }
630
631 // sync flushes in-memory data (for all files in the tree rooted at
632 // dn) to persistent storage. Caller must hold dn.Lock().
633 func (dn *dirnode) sync() error {
634         type shortBlock struct {
635                 fn  *filenode
636                 idx int
637         }
638         var pending []shortBlock
639         var pendingLen int
640
641         flush := func(sbs []shortBlock) error {
642                 if len(sbs) == 0 {
643                         return nil
644                 }
645                 block := make([]byte, 0, maxBlockSize)
646                 for _, sb := range sbs {
647                         block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
648                 }
649                 locator, _, err := dn.kc.PutB(block)
650                 if err != nil {
651                         return err
652                 }
653                 off := 0
654                 for _, sb := range sbs {
655                         data := sb.fn.extents[sb.idx].(*memExtent).buf
656                         sb.fn.extents[sb.idx] = storedExtent{
657                                 kc:      dn.kc,
658                                 locator: locator,
659                                 size:    len(block),
660                                 offset:  off,
661                                 length:  len(data),
662                         }
663                         off += len(data)
664                         sb.fn.memsize -= int64(len(data))
665                 }
666                 return nil
667         }
668
669         names := make([]string, 0, len(dn.inodes))
670         for name := range dn.inodes {
671                 names = append(names, name)
672         }
673         sort.Strings(names)
674
675         for _, name := range names {
676                 fn, ok := dn.inodes[name].(*filenode)
677                 if !ok {
678                         continue
679                 }
680                 fn.Lock()
681                 defer fn.Unlock()
682                 for idx, ext := range fn.extents {
683                         ext, ok := ext.(*memExtent)
684                         if !ok {
685                                 continue
686                         }
687                         if ext.Len() > maxBlockSize/2 {
688                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
689                                         return err
690                                 }
691                                 continue
692                         }
693                         if pendingLen+ext.Len() > maxBlockSize {
694                                 if err := flush(pending); err != nil {
695                                         return err
696                                 }
697                                 pending = nil
698                                 pendingLen = 0
699                         }
700                         pending = append(pending, shortBlock{fn, idx})
701                         pendingLen += ext.Len()
702                 }
703         }
704         return flush(pending)
705 }
706
707 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
708         dn.Lock()
709         defer dn.Unlock()
710         return dn.marshalManifest(prefix)
711 }
712
713 // caller must have read lock.
714 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
715         var streamLen int64
716         type m1segment struct {
717                 name   string
718                 offset int64
719                 length int64
720         }
721         var segments []m1segment
722         var subdirs string
723         var blocks []string
724
725         if err := dn.sync(); err != nil {
726                 return "", err
727         }
728
729         names := make([]string, 0, len(dn.inodes))
730         for name, node := range dn.inodes {
731                 names = append(names, name)
732                 node.Lock()
733                 defer node.Unlock()
734         }
735         sort.Strings(names)
736
737         for _, name := range names {
738                 node := dn.inodes[name]
739                 switch node := node.(type) {
740                 case *dirnode:
741                         subdir, err := node.marshalManifest(prefix + "/" + name)
742                         if err != nil {
743                                 return "", err
744                         }
745                         subdirs = subdirs + subdir
746                 case *filenode:
747                         if len(node.extents) == 0 {
748                                 segments = append(segments, m1segment{name: name})
749                                 break
750                         }
751                         for _, e := range node.extents {
752                                 switch e := e.(type) {
753                                 case storedExtent:
754                                         if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
755                                                 streamLen -= int64(e.size)
756                                         } else {
757                                                 blocks = append(blocks, e.locator)
758                                         }
759                                         next := m1segment{
760                                                 name:   name,
761                                                 offset: streamLen + int64(e.offset),
762                                                 length: int64(e.length),
763                                         }
764                                         if prev := len(segments) - 1; prev >= 0 &&
765                                                 segments[prev].name == name &&
766                                                 segments[prev].offset+segments[prev].length == next.offset {
767                                                 segments[prev].length += next.length
768                                         } else {
769                                                 segments = append(segments, next)
770                                         }
771                                         streamLen += int64(e.size)
772                                 default:
773                                         // This can't happen: we
774                                         // haven't unlocked since
775                                         // calling sync().
776                                         panic(fmt.Sprintf("can't marshal extent type %T", e))
777                                 }
778                         }
779                 default:
780                         panic(fmt.Sprintf("can't marshal inode type %T", node))
781                 }
782         }
783         var filetokens []string
784         for _, s := range segments {
785                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
786         }
787         if len(filetokens) == 0 {
788                 return subdirs, nil
789         } else if len(blocks) == 0 {
790                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
791         }
792         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
793 }
794
795 func (dn *dirnode) loadManifest(txt string) error {
796         // FIXME: faster
797         var dirname string
798         streams := strings.Split(txt, "\n")
799         if streams[len(streams)-1] != "" {
800                 return fmt.Errorf("line %d: no trailing newline", len(streams))
801         }
802         var extents []storedExtent
803         for i, stream := range streams[:len(streams)-1] {
804                 lineno := i + 1
805                 var anyFileTokens bool
806                 var pos int64
807                 var extIdx int
808                 extents = extents[:0]
809                 for i, token := range strings.Split(stream, " ") {
810                         if i == 0 {
811                                 dirname = manifestUnescape(token)
812                                 continue
813                         }
814                         if !strings.Contains(token, ":") {
815                                 if anyFileTokens {
816                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
817                                 }
818                                 toks := strings.SplitN(token, "+", 3)
819                                 if len(toks) < 2 {
820                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
821                                 }
822                                 length, err := strconv.ParseInt(toks[1], 10, 32)
823                                 if err != nil || length < 0 {
824                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
825                                 }
826                                 extents = append(extents, storedExtent{
827                                         locator: token,
828                                         size:    int(length),
829                                         offset:  0,
830                                         length:  int(length),
831                                 })
832                                 continue
833                         } else if len(extents) == 0 {
834                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
835                         }
836
837                         toks := strings.Split(token, ":")
838                         if len(toks) != 3 {
839                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
840                         }
841                         anyFileTokens = true
842
843                         offset, err := strconv.ParseInt(toks[0], 10, 64)
844                         if err != nil || offset < 0 {
845                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
846                         }
847                         length, err := strconv.ParseInt(toks[1], 10, 64)
848                         if err != nil || length < 0 {
849                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
850                         }
851                         name := dirname + "/" + manifestUnescape(toks[2])
852                         fnode, err := dn.createFileAndParents(name)
853                         if err != nil {
854                                 return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
855                         }
856                         // Map the stream offset/range coordinates to
857                         // block/offset/range coordinates and add
858                         // corresponding storedExtents to the filenode
859                         if pos > offset {
860                                 // Can't continue where we left off.
861                                 // TODO: binary search instead of
862                                 // rewinding all the way (but this
863                                 // situation might be rare anyway)
864                                 extIdx, pos = 0, 0
865                         }
866                         for next := int64(0); extIdx < len(extents); extIdx, pos = extIdx+1, next {
867                                 e := extents[extIdx]
868                                 next = pos + int64(e.Len())
869                                 if next <= offset || e.Len() == 0 {
870                                         pos = next
871                                         continue
872                                 }
873                                 if pos >= offset+length {
874                                         break
875                                 }
876                                 var blkOff int
877                                 if pos < offset {
878                                         blkOff = int(offset - pos)
879                                 }
880                                 blkLen := e.Len() - blkOff
881                                 if pos+int64(blkOff+blkLen) > offset+length {
882                                         blkLen = int(offset + length - pos - int64(blkOff))
883                                 }
884                                 fnode.appendExtent(storedExtent{
885                                         kc:      dn.kc,
886                                         locator: e.locator,
887                                         size:    e.size,
888                                         offset:  blkOff,
889                                         length:  blkLen,
890                                 })
891                                 if next > offset+length {
892                                         break
893                                 }
894                         }
895                         if extIdx == len(extents) && pos < offset+length {
896                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
897                         }
898                 }
899                 if !anyFileTokens {
900                         return fmt.Errorf("line %d: no file segments", lineno)
901                 } else if len(extents) == 0 {
902                         return fmt.Errorf("line %d: no locators", lineno)
903                 } else if dirname == "" {
904                         return fmt.Errorf("line %d: no stream name", lineno)
905                 }
906         }
907         return nil
908 }
909
910 // only safe to call from loadManifest -- no locking
911 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
912         names := strings.Split(path, "/")
913         if basename := names[len(names)-1]; basename == "" || basename == "." || basename == ".." {
914                 err = fmt.Errorf("invalid filename")
915                 return
916         }
917         var node inode = dn
918         for i, name := range names {
919                 dn, ok := node.(*dirnode)
920                 if !ok {
921                         err = ErrFileExists
922                         return
923                 }
924                 if name == "" || name == "." {
925                         continue
926                 }
927                 if name == ".." {
928                         node = dn.parent
929                         continue
930                 }
931                 node, ok = dn.inodes[name]
932                 if !ok {
933                         if i == len(names)-1 {
934                                 fn = dn.newFilenode(name, 0755)
935                                 return
936                         }
937                         node = dn.newDirnode(name, 0755)
938                 }
939         }
940         var ok bool
941         if fn, ok = node.(*filenode); !ok {
942                 err = ErrInvalidArgument
943         }
944         return
945 }
946
947 func (dn *dirnode) mkdir(name string) (*file, error) {
948         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
949 }
950
951 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
952         f, err := dn.mkdir(name)
953         if err == nil {
954                 err = f.Close()
955         }
956         return err
957 }
958
959 func (dn *dirnode) Remove(name string) error {
960         return dn.remove(name, false)
961 }
962
963 func (dn *dirnode) RemoveAll(name string) error {
964         return dn.remove(name, true)
965 }
966
967 func (dn *dirnode) remove(name string, recursive bool) error {
968         dirname, name := path.Split(name)
969         if name == "" || name == "." || name == ".." {
970                 return ErrInvalidArgument
971         }
972         dn, ok := dn.lookupPath(dirname).(*dirnode)
973         if !ok {
974                 return os.ErrNotExist
975         }
976         dn.Lock()
977         defer dn.Unlock()
978         switch node := dn.inodes[name].(type) {
979         case nil:
980                 return os.ErrNotExist
981         case *dirnode:
982                 node.RLock()
983                 defer node.RUnlock()
984                 if !recursive && len(node.inodes) > 0 {
985                         return ErrDirectoryNotEmpty
986                 }
987         }
988         delete(dn.inodes, name)
989         return nil
990 }
991
992 func (dn *dirnode) Rename(oldname, newname string) error {
993         olddir, oldname := path.Split(oldname)
994         if oldname == "" || oldname == "." || oldname == ".." {
995                 return ErrInvalidArgument
996         }
997         olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
998         if err != nil {
999                 return fmt.Errorf("%q: %s", olddir, err)
1000         }
1001         defer olddirf.Close()
1002         newdir, newname := path.Split(newname)
1003         if newname == "." || newname == ".." {
1004                 return ErrInvalidArgument
1005         } else if newname == "" {
1006                 // Rename("a/b", "c/") means Rename("a/b", "c/b")
1007                 newname = oldname
1008         }
1009         newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
1010         if err != nil {
1011                 return fmt.Errorf("%q: %s", newdir, err)
1012         }
1013         defer newdirf.Close()
1014
1015         // When acquiring locks on multiple nodes, all common
1016         // ancestors must be locked first in order to avoid
1017         // deadlock. This is assured by locking the path from root to
1018         // newdir, then locking the path from root to olddir, skipping
1019         // any already-locked nodes.
1020         needLock := []sync.Locker{}
1021         for _, f := range []*file{olddirf, newdirf} {
1022                 node := f.inode
1023                 needLock = append(needLock, node)
1024                 for node.Parent() != node {
1025                         node = node.Parent()
1026                         needLock = append(needLock, node)
1027                 }
1028         }
1029         locked := map[sync.Locker]bool{}
1030         for i := len(needLock) - 1; i >= 0; i-- {
1031                 if n := needLock[i]; !locked[n] {
1032                         n.Lock()
1033                         defer n.Unlock()
1034                         locked[n] = true
1035                 }
1036         }
1037
1038         olddn := olddirf.inode.(*dirnode)
1039         newdn := newdirf.inode.(*dirnode)
1040         oldinode, ok := olddn.inodes[oldname]
1041         if !ok {
1042                 return os.ErrNotExist
1043         }
1044         if existing, ok := newdn.inodes[newname]; ok {
1045                 // overwriting an existing file or dir
1046                 if dn, ok := existing.(*dirnode); ok {
1047                         if !oldinode.Stat().IsDir() {
1048                                 return ErrIsDirectory
1049                         }
1050                         dn.RLock()
1051                         defer dn.RUnlock()
1052                         if len(dn.inodes) > 0 {
1053                                 return ErrDirectoryNotEmpty
1054                         }
1055                 }
1056         } else {
1057                 newdn.fileinfo.size++
1058         }
1059         newdn.inodes[newname] = oldinode
1060         delete(olddn.inodes, oldname)
1061         olddn.fileinfo.size--
1062         return nil
1063 }
1064
1065 func (dn *dirnode) Parent() inode {
1066         dn.RLock()
1067         defer dn.RUnlock()
1068         return dn.parent
1069 }
1070
1071 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
1072         dn.RLock()
1073         defer dn.RUnlock()
1074         fi = make([]os.FileInfo, 0, len(dn.inodes))
1075         for _, inode := range dn.inodes {
1076                 fi = append(fi, inode.Stat())
1077         }
1078         return
1079 }
1080
1081 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1082         return 0, ptr, ErrInvalidOperation
1083 }
1084
1085 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
1086         return 0, ptr, ErrInvalidOperation
1087 }
1088
1089 func (dn *dirnode) Size() int64 {
1090         dn.RLock()
1091         defer dn.RUnlock()
1092         return dn.fileinfo.Size()
1093 }
1094
1095 func (dn *dirnode) Stat() os.FileInfo {
1096         dn.RLock()
1097         defer dn.RUnlock()
1098         return dn.fileinfo
1099 }
1100
1101 func (dn *dirnode) Truncate(int64) error {
1102         return ErrInvalidOperation
1103 }
1104
1105 // lookupPath returns the inode for the file/directory with the given
1106 // name (which may contain "/" separators), along with its parent
1107 // node. If no such file/directory exists, the returned node is nil.
1108 func (dn *dirnode) lookupPath(path string) (node inode) {
1109         node = dn
1110         for _, name := range strings.Split(path, "/") {
1111                 dn, ok := node.(*dirnode)
1112                 if !ok {
1113                         return nil
1114                 }
1115                 if name == "." || name == "" {
1116                         continue
1117                 }
1118                 if name == ".." {
1119                         node = node.Parent()
1120                         continue
1121                 }
1122                 dn.RLock()
1123                 node = dn.inodes[name]
1124                 dn.RUnlock()
1125         }
1126         return
1127 }
1128
1129 func (dn *dirnode) newDirnode(name string, perm os.FileMode) *dirnode {
1130         child := &dirnode{
1131                 parent: dn,
1132                 client: dn.client,
1133                 kc:     dn.kc,
1134                 fileinfo: fileinfo{
1135                         name: name,
1136                         mode: os.ModeDir | perm,
1137                 },
1138         }
1139         if dn.inodes == nil {
1140                 dn.inodes = make(map[string]inode)
1141         }
1142         dn.inodes[name] = child
1143         dn.fileinfo.size++
1144         return child
1145 }
1146
1147 func (dn *dirnode) newFilenode(name string, perm os.FileMode) *filenode {
1148         child := &filenode{
1149                 parent: dn,
1150                 fileinfo: fileinfo{
1151                         name: name,
1152                         mode: perm,
1153                 },
1154         }
1155         if dn.inodes == nil {
1156                 dn.inodes = make(map[string]inode)
1157         }
1158         dn.inodes[name] = child
1159         dn.fileinfo.size++
1160         return child
1161 }
1162
1163 // OpenFile is analogous to os.OpenFile().
1164 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
1165         if flag&os.O_SYNC != 0 {
1166                 return nil, ErrSyncNotSupported
1167         }
1168         dirname, name := path.Split(name)
1169         dn, ok := dn.lookupPath(dirname).(*dirnode)
1170         if !ok {
1171                 return nil, os.ErrNotExist
1172         }
1173         var readable, writable bool
1174         switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
1175         case os.O_RDWR:
1176                 readable = true
1177                 writable = true
1178         case os.O_RDONLY:
1179                 readable = true
1180         case os.O_WRONLY:
1181                 writable = true
1182         default:
1183                 return nil, fmt.Errorf("invalid flags 0x%x", flag)
1184         }
1185         if !writable {
1186                 // A directory can be opened via "foo/", "foo/.", or
1187                 // "foo/..".
1188                 switch name {
1189                 case ".", "":
1190                         return &file{inode: dn}, nil
1191                 case "..":
1192                         return &file{inode: dn.Parent()}, nil
1193                 }
1194         }
1195         createMode := flag&os.O_CREATE != 0
1196         if createMode {
1197                 dn.Lock()
1198                 defer dn.Unlock()
1199         } else {
1200                 dn.RLock()
1201                 defer dn.RUnlock()
1202         }
1203         n, ok := dn.inodes[name]
1204         if !ok {
1205                 if !createMode {
1206                         return nil, os.ErrNotExist
1207                 }
1208                 if perm.IsDir() {
1209                         n = dn.newDirnode(name, 0755)
1210                 } else {
1211                         n = dn.newFilenode(name, 0755)
1212                 }
1213         } else if flag&os.O_EXCL != 0 {
1214                 return nil, ErrFileExists
1215         } else if flag&os.O_TRUNC != 0 {
1216                 if !writable {
1217                         return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
1218                 } else if fn, ok := n.(*filenode); !ok {
1219                         return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
1220                 } else {
1221                         fn.Truncate(0)
1222                 }
1223         }
1224         return &file{
1225                 inode:    n,
1226                 append:   flag&os.O_APPEND != 0,
1227                 readable: readable,
1228                 writable: writable,
1229         }, nil
1230 }
1231
1232 type extent interface {
1233         io.ReaderAt
1234         Len() int
1235         // Return a new extent with a subsection of the data from this
1236         // one. length<0 means length=Len()-off.
1237         Slice(off int, length int) extent
1238 }
1239
1240 type writableExtent interface {
1241         extent
1242         WriteAt(p []byte, off int)
1243         Truncate(n int)
1244 }
1245
1246 type memExtent struct {
1247         buf []byte
1248 }
1249
1250 func (me *memExtent) Len() int {
1251         return len(me.buf)
1252 }
1253
1254 func (me *memExtent) Slice(off, length int) extent {
1255         if length < 0 {
1256                 length = len(me.buf) - off
1257         }
1258         buf := make([]byte, length)
1259         copy(buf, me.buf[off:])
1260         return &memExtent{buf: buf}
1261 }
1262
1263 func (me *memExtent) Truncate(n int) {
1264         if n > cap(me.buf) {
1265                 newsize := 1024
1266                 for newsize < n {
1267                         newsize = newsize << 2
1268                 }
1269                 newbuf := make([]byte, n, newsize)
1270                 copy(newbuf, me.buf)
1271                 me.buf = newbuf
1272         } else {
1273                 // Zero unused part when shrinking, in case we grow
1274                 // and start using it again later.
1275                 for i := n; i < len(me.buf); i++ {
1276                         me.buf[i] = 0
1277                 }
1278         }
1279         me.buf = me.buf[:n]
1280 }
1281
1282 func (me *memExtent) WriteAt(p []byte, off int) {
1283         if off+len(p) > len(me.buf) {
1284                 panic("overflowed extent")
1285         }
1286         copy(me.buf[off:], p)
1287 }
1288
1289 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1290         if off > int64(me.Len()) {
1291                 err = io.EOF
1292                 return
1293         }
1294         n = copy(p, me.buf[int(off):])
1295         if n < len(p) {
1296                 err = io.EOF
1297         }
1298         return
1299 }
1300
1301 type storedExtent struct {
1302         kc      keepClient
1303         locator string
1304         size    int
1305         offset  int
1306         length  int
1307 }
1308
1309 func (se storedExtent) Len() int {
1310         return se.length
1311 }
1312
1313 func (se storedExtent) Slice(n, size int) extent {
1314         se.offset += n
1315         se.length -= n
1316         if size >= 0 && se.length > size {
1317                 se.length = size
1318         }
1319         return se
1320 }
1321
1322 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1323         if off > int64(se.length) {
1324                 return 0, io.EOF
1325         }
1326         maxlen := se.length - int(off)
1327         if len(p) > maxlen {
1328                 p = p[:maxlen]
1329                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1330                 if err == nil {
1331                         err = io.EOF
1332                 }
1333                 return
1334         }
1335         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1336 }
1337
1338 func canonicalName(name string) string {
1339         name = path.Clean("/" + name)
1340         if name == "/" || name == "./" {
1341                 name = "."
1342         } else if strings.HasPrefix(name, "/") {
1343                 name = "." + name
1344         }
1345         return name
1346 }
1347
1348 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1349
1350 func manifestUnescapeFunc(seq string) string {
1351         if seq == `\\` {
1352                 return `\`
1353         }
1354         i, err := strconv.ParseUint(seq[1:], 8, 8)
1355         if err != nil {
1356                 // Invalid escape sequence: can't unescape.
1357                 return seq
1358         }
1359         return string([]byte{byte(i)})
1360 }
1361
1362 func manifestUnescape(s string) string {
1363         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1364 }
1365
1366 var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
1367
1368 func manifestEscapeFunc(seq string) string {
1369         return fmt.Sprintf("\\%03o", byte(seq[0]))
1370 }
1371
1372 func manifestEscape(s string) string {
1373         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1374 }