12483: Fix makeParentDirs bug when higher levels already exist.
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "errors"
9         "fmt"
10         "io"
11         "net/http"
12         "os"
13         "path"
14         "regexp"
15         "sort"
16         "strconv"
17         "strings"
18         "sync"
19         "time"
20 )
21
22 var (
23         ErrReadOnlyFile      = errors.New("read-only file")
24         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
25         ErrFileExists        = errors.New("file exists")
26         ErrInvalidOperation  = errors.New("invalid operation")
27         ErrDirectoryNotEmpty = errors.New("directory not empty")
28         ErrPermission        = os.ErrPermission
29
30         maxBlockSize = 1 << 26
31 )
32
33 type File interface {
34         io.Reader
35         io.Writer
36         io.Closer
37         io.Seeker
38         Size() int64
39         Readdir(int) ([]os.FileInfo, error)
40         Stat() (os.FileInfo, error)
41         Truncate(int64) error
42 }
43
44 type keepClient interface {
45         ReadAt(locator string, p []byte, off int) (int, error)
46         PutB(p []byte) (string, int, error)
47 }
48
49 type fileinfo struct {
50         name    string
51         mode    os.FileMode
52         size    int64
53         modTime time.Time
54 }
55
56 // Name implements os.FileInfo.
57 func (fi fileinfo) Name() string {
58         return fi.name
59 }
60
61 // ModTime implements os.FileInfo.
62 func (fi fileinfo) ModTime() time.Time {
63         return fi.modTime
64 }
65
66 // Mode implements os.FileInfo.
67 func (fi fileinfo) Mode() os.FileMode {
68         return fi.mode
69 }
70
71 // IsDir implements os.FileInfo.
72 func (fi fileinfo) IsDir() bool {
73         return fi.mode&os.ModeDir != 0
74 }
75
76 // Size implements os.FileInfo.
77 func (fi fileinfo) Size() int64 {
78         return fi.size
79 }
80
81 // Sys implements os.FileInfo.
82 func (fi fileinfo) Sys() interface{} {
83         return nil
84 }
85
86 // A CollectionFileSystem is an http.Filesystem plus Stat() and
87 // support for opening writable files.
88 type CollectionFileSystem interface {
89         http.FileSystem
90         Stat(name string) (os.FileInfo, error)
91         Create(name string) (File, error)
92         OpenFile(name string, flag int, perm os.FileMode) (File, error)
93         Mkdir(name string, perm os.FileMode) error
94         Remove(name string) error
95         MarshalManifest(string) (string, error)
96 }
97
98 type fileSystem struct {
99         dirnode
100 }
101
102 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
103         return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
104 }
105
106 func (fs *fileSystem) Open(name string) (http.File, error) {
107         return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
108 }
109
110 func (fs *fileSystem) Create(name string) (File, error) {
111         return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
112 }
113
114 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
115         f, err := fs.OpenFile(name, os.O_RDONLY, 0)
116         if err != nil {
117                 return nil, err
118         }
119         defer f.Close()
120         return f.Stat()
121 }
122
123 type inode interface {
124         Parent() inode
125         Read([]byte, filenodePtr) (int, filenodePtr, error)
126         Write([]byte, filenodePtr) (int, filenodePtr, error)
127         Truncate(int64) error
128         Readdir() []os.FileInfo
129         Size() int64
130         Stat() os.FileInfo
131         sync.Locker
132         RLock()
133         RUnlock()
134 }
135
136 // filenode implements inode.
137 type filenode struct {
138         fileinfo fileinfo
139         parent   *dirnode
140         extents  []extent
141         repacked int64 // number of times anything in []extents has changed len
142         memsize  int64 // bytes in memExtents
143         sync.RWMutex
144 }
145
146 // filenodePtr is an offset into a file that is (usually) efficient to
147 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
148 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
149 // corresponds to file offset filenodePtr.off. Otherwise, it is
150 // necessary to reexamine len(filenode.extents[0]) etc. to find the
151 // correct extent and offset.
152 type filenodePtr struct {
153         off       int64
154         extentIdx int
155         extentOff int
156         repacked  int64
157 }
158
159 // seek returns a ptr that is consistent with both startPtr.off and
160 // the current state of fn. The caller must already hold fn.RLock() or
161 // fn.Lock().
162 //
163 // If startPtr points beyond the end of the file, ptr will point to
164 // exactly the end of the file.
165 //
166 // After seeking:
167 //
168 //     ptr.extentIdx == len(filenode.extents) // i.e., at EOF
169 //     ||
170 //     filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
171 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
172         ptr = startPtr
173         if ptr.off < 0 {
174                 // meaningless anyway
175                 return
176         } else if ptr.off >= fn.fileinfo.size {
177                 ptr.off = fn.fileinfo.size
178                 ptr.extentIdx = len(fn.extents)
179                 ptr.extentOff = 0
180                 ptr.repacked = fn.repacked
181                 return
182         } else if ptr.repacked == fn.repacked {
183                 // extentIdx and extentOff accurately reflect ptr.off,
184                 // but might have fallen off the end of an extent
185                 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
186                         ptr.extentIdx++
187                         ptr.extentOff = 0
188                 }
189                 return
190         }
191         defer func() {
192                 ptr.repacked = fn.repacked
193         }()
194         if ptr.off >= fn.fileinfo.size {
195                 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
196                 return
197         }
198         // Recompute extentIdx and extentOff.  We have already
199         // established fn.fileinfo.size > ptr.off >= 0, so we don't
200         // have to deal with edge cases here.
201         var off int64
202         for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
203                 // This would panic (index out of range) if
204                 // fn.fileinfo.size were larger than
205                 // sum(fn.extents[i].Len()) -- but that can't happen
206                 // because we have ensured fn.fileinfo.size is always
207                 // accurate.
208                 extLen := int64(fn.extents[ptr.extentIdx].Len())
209                 if off+extLen > ptr.off {
210                         ptr.extentOff = int(ptr.off - off)
211                         break
212                 }
213                 off += extLen
214         }
215         return
216 }
217
218 func (fn *filenode) appendExtent(e extent) {
219         fn.Lock()
220         defer fn.Unlock()
221         fn.extents = append(fn.extents, e)
222         fn.fileinfo.size += int64(e.Len())
223 }
224
225 func (fn *filenode) Parent() inode {
226         return fn.parent
227 }
228
229 func (fn *filenode) Readdir() []os.FileInfo {
230         return nil
231 }
232
233 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
234         fn.RLock()
235         defer fn.RUnlock()
236         ptr = fn.seek(startPtr)
237         if ptr.off < 0 {
238                 err = ErrNegativeOffset
239                 return
240         }
241         if ptr.extentIdx >= len(fn.extents) {
242                 err = io.EOF
243                 return
244         }
245         n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
246         if n > 0 {
247                 ptr.off += int64(n)
248                 ptr.extentOff += n
249                 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
250                         ptr.extentIdx++
251                         ptr.extentOff = 0
252                         if ptr.extentIdx < len(fn.extents) && err == io.EOF {
253                                 err = nil
254                         }
255                 }
256         }
257         return
258 }
259
260 func (fn *filenode) Size() int64 {
261         fn.RLock()
262         defer fn.RUnlock()
263         return fn.fileinfo.Size()
264 }
265
266 func (fn *filenode) Stat() os.FileInfo {
267         fn.RLock()
268         defer fn.RUnlock()
269         return fn.fileinfo
270 }
271
272 func (fn *filenode) Truncate(size int64) error {
273         fn.Lock()
274         defer fn.Unlock()
275         if size < fn.fileinfo.size {
276                 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
277                 for i := ptr.extentIdx; i < len(fn.extents); i++ {
278                         if ext, ok := fn.extents[i].(*memExtent); ok {
279                                 fn.memsize -= int64(ext.Len())
280                         }
281                 }
282                 if ptr.extentOff == 0 {
283                         fn.extents = fn.extents[:ptr.extentIdx]
284                 } else {
285                         fn.extents = fn.extents[:ptr.extentIdx+1]
286                         switch ext := fn.extents[ptr.extentIdx].(type) {
287                         case *memExtent:
288                                 ext.Truncate(ptr.extentOff)
289                                 fn.memsize += int64(ext.Len())
290                         default:
291                                 fn.extents[ptr.extentIdx] = ext.Slice(0, ptr.extentOff)
292                         }
293                 }
294                 fn.fileinfo.size = size
295                 fn.repacked++
296                 return nil
297         }
298         for size > fn.fileinfo.size {
299                 grow := size - fn.fileinfo.size
300                 var e writableExtent
301                 var ok bool
302                 if len(fn.extents) == 0 {
303                         e = &memExtent{}
304                         fn.extents = append(fn.extents, e)
305                 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
306                         e = &memExtent{}
307                         fn.extents = append(fn.extents, e)
308                 } else {
309                         fn.repacked++
310                 }
311                 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
312                         grow = maxgrow
313                 }
314                 e.Truncate(e.Len() + int(grow))
315                 fn.fileinfo.size += grow
316                 fn.memsize += grow
317         }
318         return nil
319 }
320
321 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
322         fn.Lock()
323         defer fn.Unlock()
324         ptr = fn.seek(startPtr)
325         if ptr.off < 0 {
326                 err = ErrNegativeOffset
327                 return
328         }
329         for len(p) > 0 && err == nil {
330                 cando := p
331                 if len(cando) > maxBlockSize {
332                         cando = cando[:maxBlockSize]
333                 }
334                 // Rearrange/grow fn.extents (and shrink cando if
335                 // needed) such that cando can be copied to
336                 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
337                 cur := ptr.extentIdx
338                 prev := ptr.extentIdx - 1
339                 var curWritable bool
340                 if cur < len(fn.extents) {
341                         _, curWritable = fn.extents[cur].(writableExtent)
342                 }
343                 var prevAppendable bool
344                 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
345                         _, prevAppendable = fn.extents[prev].(writableExtent)
346                 }
347                 if ptr.extentOff > 0 && !curWritable {
348                         // Split a non-writable block.
349                         if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
350                                 // Truncate cur, and insert a new
351                                 // extent after it.
352                                 cando = cando[:max]
353                                 fn.extents = append(fn.extents, nil)
354                                 copy(fn.extents[cur+1:], fn.extents[cur:])
355                         } else {
356                                 // Split cur into two copies, truncate
357                                 // the one on the left, shift the one
358                                 // on the right, and insert a new
359                                 // extent between them.
360                                 fn.extents = append(fn.extents, nil, nil)
361                                 copy(fn.extents[cur+2:], fn.extents[cur:])
362                                 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
363                         }
364                         cur++
365                         prev++
366                         e := &memExtent{}
367                         e.Truncate(len(cando))
368                         fn.memsize += int64(len(cando))
369                         fn.extents[cur] = e
370                         fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
371                         ptr.extentIdx++
372                         ptr.extentOff = 0
373                         fn.repacked++
374                         ptr.repacked++
375                 } else if curWritable {
376                         if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
377                                 cando = cando[:fit]
378                         }
379                 } else {
380                         if prevAppendable {
381                                 // Shrink cando if needed to fit in prev extent.
382                                 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
383                                         cando = cando[:cangrow]
384                                 }
385                         }
386
387                         if cur == len(fn.extents) {
388                                 // ptr is at EOF, filesize is changing.
389                                 fn.fileinfo.size += int64(len(cando))
390                         } else if el := fn.extents[cur].Len(); el <= len(cando) {
391                                 // cando is long enough that we won't
392                                 // need cur any more. shrink cando to
393                                 // be exactly as long as cur
394                                 // (otherwise we'd accidentally shift
395                                 // the effective position of all
396                                 // extents after cur).
397                                 cando = cando[:el]
398                                 copy(fn.extents[cur:], fn.extents[cur+1:])
399                                 fn.extents = fn.extents[:len(fn.extents)-1]
400                         } else {
401                                 // shrink cur by the same #bytes we're growing prev
402                                 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
403                         }
404
405                         if prevAppendable {
406                                 // Grow prev.
407                                 ptr.extentIdx--
408                                 ptr.extentOff = fn.extents[prev].Len()
409                                 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
410                                 fn.memsize += int64(len(cando))
411                                 ptr.repacked++
412                                 fn.repacked++
413                         } else {
414                                 // Insert an extent between prev and cur, and advance prev/cur.
415                                 fn.extents = append(fn.extents, nil)
416                                 if cur < len(fn.extents) {
417                                         copy(fn.extents[cur+1:], fn.extents[cur:])
418                                         ptr.repacked++
419                                         fn.repacked++
420                                 } else {
421                                         // appending a new extent does
422                                         // not invalidate any ptrs
423                                 }
424                                 e := &memExtent{}
425                                 e.Truncate(len(cando))
426                                 fn.memsize += int64(len(cando))
427                                 fn.extents[cur] = e
428                                 cur++
429                                 prev++
430                         }
431                 }
432
433                 // Finally we can copy bytes from cando to the current extent.
434                 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
435                 n += len(cando)
436                 p = p[len(cando):]
437
438                 ptr.off += int64(len(cando))
439                 ptr.extentOff += len(cando)
440                 if ptr.extentOff >= maxBlockSize {
441                         fn.pruneMemExtents()
442                 }
443                 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
444                         ptr.extentOff = 0
445                         ptr.extentIdx++
446                 }
447         }
448         return
449 }
450
451 // Write some data out to disk to reduce memory use. Caller must have
452 // write lock.
453 func (fn *filenode) pruneMemExtents() {
454         // TODO: async (don't hold Lock() while waiting for Keep)
455         // TODO: share code with (*dirnode)sync()
456         // TODO: pack/flush small blocks too, when fragmented
457         for idx, ext := range fn.extents {
458                 ext, ok := ext.(*memExtent)
459                 if !ok || ext.Len() < maxBlockSize {
460                         continue
461                 }
462                 locator, _, err := fn.parent.kc.PutB(ext.buf)
463                 if err != nil {
464                         // TODO: stall (or return errors from)
465                         // subsequent writes until flushing
466                         // starts to succeed
467                         continue
468                 }
469                 fn.memsize -= int64(ext.Len())
470                 fn.extents[idx] = storedExtent{
471                         kc:      fn.parent.kc,
472                         locator: locator,
473                         size:    ext.Len(),
474                         offset:  0,
475                         length:  ext.Len(),
476                 }
477         }
478 }
479
480 // FileSystem returns a CollectionFileSystem for the collection.
481 func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
482         fs := &fileSystem{dirnode: dirnode{
483                 client:   client,
484                 kc:       kc,
485                 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
486                 parent:   nil,
487                 inodes:   make(map[string]inode),
488         }}
489         fs.dirnode.parent = &fs.dirnode
490         if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
491                 return nil, err
492         }
493         return fs, nil
494 }
495
496 type file struct {
497         inode
498         ptr        filenodePtr
499         append     bool
500         writable   bool
501         unreaddirs []os.FileInfo
502 }
503
504 func (f *file) Read(p []byte) (n int, err error) {
505         n, f.ptr, err = f.inode.Read(p, f.ptr)
506         return
507 }
508
509 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
510         size := f.inode.Size()
511         ptr := f.ptr
512         switch whence {
513         case os.SEEK_SET:
514                 ptr.off = off
515         case os.SEEK_CUR:
516                 ptr.off += off
517         case os.SEEK_END:
518                 ptr.off = size + off
519         }
520         if ptr.off < 0 {
521                 return f.ptr.off, ErrNegativeOffset
522         }
523         if ptr.off > size {
524                 ptr.off = size
525         }
526         if ptr.off != f.ptr.off {
527                 f.ptr = ptr
528                 // force filenode to recompute f.ptr fields on next
529                 // use
530                 f.ptr.repacked = -1
531         }
532         return f.ptr.off, nil
533 }
534
535 func (f *file) Truncate(size int64) error {
536         return f.inode.Truncate(size)
537 }
538
539 func (f *file) Write(p []byte) (n int, err error) {
540         if !f.writable {
541                 return 0, ErrReadOnlyFile
542         }
543         n, f.ptr, err = f.inode.Write(p, f.ptr)
544         return
545 }
546
547 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
548         if !f.inode.Stat().IsDir() {
549                 return nil, ErrInvalidOperation
550         }
551         if count <= 0 {
552                 return f.inode.Readdir(), nil
553         }
554         if f.unreaddirs == nil {
555                 f.unreaddirs = f.inode.Readdir()
556         }
557         if len(f.unreaddirs) == 0 {
558                 return nil, io.EOF
559         }
560         if count > len(f.unreaddirs) {
561                 count = len(f.unreaddirs)
562         }
563         ret := f.unreaddirs[:count]
564         f.unreaddirs = f.unreaddirs[count:]
565         return ret, nil
566 }
567
568 func (f *file) Stat() (os.FileInfo, error) {
569         return f.inode.Stat(), nil
570 }
571
572 func (f *file) Close() error {
573         // FIXME: flush
574         return nil
575 }
576
577 type dirnode struct {
578         fileinfo fileinfo
579         parent   *dirnode
580         client   *Client
581         kc       keepClient
582         inodes   map[string]inode
583         sync.RWMutex
584 }
585
586 // sync flushes in-memory data (for all files in the tree rooted at
587 // dn) to persistent storage. Caller must hold dn.Lock().
588 func (dn *dirnode) sync() error {
589         type shortBlock struct {
590                 fn  *filenode
591                 idx int
592         }
593         var pending []shortBlock
594         var pendingLen int
595
596         flush := func(sbs []shortBlock) error {
597                 if len(sbs) == 0 {
598                         return nil
599                 }
600                 block := make([]byte, 0, maxBlockSize)
601                 for _, sb := range sbs {
602                         block = append(block, sb.fn.extents[sb.idx].(*memExtent).buf...)
603                 }
604                 locator, _, err := dn.kc.PutB(block)
605                 if err != nil {
606                         return err
607                 }
608                 off := 0
609                 for _, sb := range sbs {
610                         data := sb.fn.extents[sb.idx].(*memExtent).buf
611                         sb.fn.extents[sb.idx] = storedExtent{
612                                 kc:      dn.kc,
613                                 locator: locator,
614                                 size:    len(block),
615                                 offset:  off,
616                                 length:  len(data),
617                         }
618                         off += len(data)
619                         sb.fn.memsize -= int64(len(data))
620                 }
621                 return nil
622         }
623
624         names := make([]string, 0, len(dn.inodes))
625         for name := range dn.inodes {
626                 names = append(names, name)
627         }
628         sort.Strings(names)
629
630         for _, name := range names {
631                 fn, ok := dn.inodes[name].(*filenode)
632                 if !ok {
633                         continue
634                 }
635                 fn.Lock()
636                 defer fn.Unlock()
637                 for idx, ext := range fn.extents {
638                         ext, ok := ext.(*memExtent)
639                         if !ok {
640                                 continue
641                         }
642                         if ext.Len() > maxBlockSize/2 {
643                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
644                                         return err
645                                 }
646                                 continue
647                         }
648                         if pendingLen+ext.Len() > maxBlockSize {
649                                 if err := flush(pending); err != nil {
650                                         return err
651                                 }
652                                 pending = nil
653                                 pendingLen = 0
654                         }
655                         pending = append(pending, shortBlock{fn, idx})
656                         pendingLen += ext.Len()
657                 }
658         }
659         return flush(pending)
660 }
661
662 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
663         dn.Lock()
664         defer dn.Unlock()
665         return dn.marshalManifest(prefix)
666 }
667
668 // caller must have read lock.
669 func (dn *dirnode) marshalManifest(prefix string) (string, error) {
670         var streamLen int64
671         type m1segment struct {
672                 name   string
673                 offset int64
674                 length int64
675         }
676         var segments []m1segment
677         var subdirs string
678         var blocks []string
679
680         if err := dn.sync(); err != nil {
681                 return "", err
682         }
683
684         names := make([]string, 0, len(dn.inodes))
685         for name, node := range dn.inodes {
686                 names = append(names, name)
687                 node.Lock()
688                 defer node.Unlock()
689         }
690         sort.Strings(names)
691
692         for _, name := range names {
693                 node := dn.inodes[name]
694                 switch node := node.(type) {
695                 case *dirnode:
696                         subdir, err := node.marshalManifest(prefix + "/" + name)
697                         if err != nil {
698                                 return "", err
699                         }
700                         subdirs = subdirs + subdir
701                 case *filenode:
702                         if len(node.extents) == 0 {
703                                 segments = append(segments, m1segment{name: name})
704                                 break
705                         }
706                         for _, e := range node.extents {
707                                 switch e := e.(type) {
708                                 case storedExtent:
709                                         if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
710                                                 streamLen -= int64(e.size)
711                                         } else {
712                                                 blocks = append(blocks, e.locator)
713                                         }
714                                         segments = append(segments, m1segment{
715                                                 name:   name,
716                                                 offset: streamLen + int64(e.offset),
717                                                 length: int64(e.length),
718                                         })
719                                         streamLen += int64(e.size)
720                                 default:
721                                         // This can't happen: we
722                                         // haven't unlocked since
723                                         // calling sync().
724                                         panic(fmt.Sprintf("can't marshal extent type %T", e))
725                                 }
726                         }
727                 default:
728                         panic(fmt.Sprintf("can't marshal inode type %T", node))
729                 }
730         }
731         var filetokens []string
732         for _, s := range segments {
733                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
734         }
735         if len(filetokens) == 0 {
736                 return subdirs, nil
737         } else if len(blocks) == 0 {
738                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
739         }
740         return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
741 }
742
743 func (dn *dirnode) loadManifest(txt string) error {
744         // FIXME: faster
745         var dirname string
746         streams := strings.Split(txt, "\n")
747         if streams[len(streams)-1] != "" {
748                 return fmt.Errorf("line %d: no trailing newline", len(streams))
749         }
750         for i, stream := range streams[:len(streams)-1] {
751                 lineno := i + 1
752                 var extents []storedExtent
753                 var anyFileTokens bool
754                 for i, token := range strings.Split(stream, " ") {
755                         if i == 0 {
756                                 dirname = manifestUnescape(token)
757                                 continue
758                         }
759                         if !strings.Contains(token, ":") {
760                                 if anyFileTokens {
761                                         return fmt.Errorf("line %d: bad file segment %q", lineno, token)
762                                 }
763                                 toks := strings.SplitN(token, "+", 3)
764                                 if len(toks) < 2 {
765                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
766                                 }
767                                 length, err := strconv.ParseInt(toks[1], 10, 32)
768                                 if err != nil || length < 0 {
769                                         return fmt.Errorf("line %d: bad locator %q", lineno, token)
770                                 }
771                                 extents = append(extents, storedExtent{
772                                         locator: token,
773                                         size:    int(length),
774                                         offset:  0,
775                                         length:  int(length),
776                                 })
777                                 continue
778                         } else if len(extents) == 0 {
779                                 return fmt.Errorf("line %d: bad locator %q", lineno, token)
780                         }
781
782                         toks := strings.Split(token, ":")
783                         if len(toks) != 3 {
784                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
785                         }
786                         anyFileTokens = true
787
788                         offset, err := strconv.ParseInt(toks[0], 10, 64)
789                         if err != nil || offset < 0 {
790                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
791                         }
792                         length, err := strconv.ParseInt(toks[1], 10, 64)
793                         if err != nil || length < 0 {
794                                 return fmt.Errorf("line %d: bad file segment %q", lineno, token)
795                         }
796                         name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
797                         err = dn.makeParentDirs(name)
798                         if err != nil {
799                                 return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
800                         }
801                         f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
802                         if err != nil {
803                                 return fmt.Errorf("line %d: cannot append to %q: %s", lineno, name, err)
804                         }
805                         if f.inode.Stat().IsDir() {
806                                 f.Close()
807                                 return fmt.Errorf("line %d: cannot append to %q: is a directory", lineno, name)
808                         }
809                         // Map the stream offset/range coordinates to
810                         // block/offset/range coordinates and add
811                         // corresponding storedExtents to the filenode
812                         var pos int64
813                         for _, e := range extents {
814                                 next := pos + int64(e.Len())
815                                 if next < offset {
816                                         pos = next
817                                         continue
818                                 }
819                                 if pos > offset+length {
820                                         break
821                                 }
822                                 var blkOff int
823                                 if pos < offset {
824                                         blkOff = int(offset - pos)
825                                 }
826                                 blkLen := e.Len() - blkOff
827                                 if pos+int64(blkOff+blkLen) > offset+length {
828                                         blkLen = int(offset + length - pos - int64(blkOff))
829                                 }
830                                 f.inode.(*filenode).appendExtent(storedExtent{
831                                         kc:      dn.kc,
832                                         locator: e.locator,
833                                         size:    e.size,
834                                         offset:  blkOff,
835                                         length:  blkLen,
836                                 })
837                                 pos = next
838                         }
839                         f.Close()
840                         if pos < offset+length {
841                                 return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
842                         }
843                 }
844                 if !anyFileTokens {
845                         return fmt.Errorf("line %d: no file segments", lineno)
846                 } else if len(extents) == 0 {
847                         return fmt.Errorf("line %d: no locators", lineno)
848                 } else if dirname == "" {
849                         return fmt.Errorf("line %d: no stream name", lineno)
850                 }
851         }
852         return nil
853 }
854
855 func (dn *dirnode) makeParentDirs(name string) (err error) {
856         names := strings.Split(name, "/")
857         for _, name := range names[:len(names)-1] {
858                 f, err := dn.OpenFile(name, os.O_CREATE, os.ModeDir|0755)
859                 if err != nil {
860                         return err
861                 }
862                 defer f.Close()
863                 var ok bool
864                 dn, ok = f.inode.(*dirnode)
865                 if !ok {
866                         return ErrFileExists
867                 }
868         }
869         return nil
870 }
871
872 func (dn *dirnode) mkdir(name string) (*file, error) {
873         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
874 }
875
876 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
877         f, err := dn.mkdir(name)
878         if err == nil {
879                 err = f.Close()
880         }
881         return err
882 }
883
884 func (dn *dirnode) Remove(name string) error {
885         dirname, name := path.Split(name)
886         if name == "" || name == "." || name == ".." {
887                 return ErrInvalidOperation
888         }
889         dn, ok := dn.lookupPath(dirname).(*dirnode)
890         if !ok {
891                 return os.ErrNotExist
892         }
893         dn.Lock()
894         defer dn.Unlock()
895         switch node := dn.inodes[name].(type) {
896         case nil:
897                 return os.ErrNotExist
898         case *dirnode:
899                 node.RLock()
900                 defer node.RUnlock()
901                 if len(node.inodes) > 0 {
902                         return ErrDirectoryNotEmpty
903                 }
904         }
905         delete(dn.inodes, name)
906         return nil
907 }
908
909 func (dn *dirnode) Parent() inode {
910         dn.RLock()
911         defer dn.RUnlock()
912         return dn.parent
913 }
914
915 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
916         dn.RLock()
917         defer dn.RUnlock()
918         fi = make([]os.FileInfo, 0, len(dn.inodes))
919         for _, inode := range dn.inodes {
920                 fi = append(fi, inode.Stat())
921         }
922         return
923 }
924
925 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
926         return 0, ptr, ErrInvalidOperation
927 }
928
929 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
930         return 0, ptr, ErrInvalidOperation
931 }
932
933 func (dn *dirnode) Size() int64 {
934         dn.RLock()
935         defer dn.RUnlock()
936         return dn.fileinfo.Size()
937 }
938
939 func (dn *dirnode) Stat() os.FileInfo {
940         dn.RLock()
941         defer dn.RUnlock()
942         return dn.fileinfo
943 }
944
945 func (dn *dirnode) Truncate(int64) error {
946         return ErrInvalidOperation
947 }
948
949 // lookupPath returns the inode for the file/directory with the given
950 // name (which may contain "/" separators), along with its parent
951 // node. If no such file/directory exists, the returned node is nil.
952 func (dn *dirnode) lookupPath(path string) (node inode) {
953         node = dn
954         for _, name := range strings.Split(path, "/") {
955                 dn, ok := node.(*dirnode)
956                 if !ok {
957                         return nil
958                 }
959                 if name == "." || name == "" {
960                         continue
961                 }
962                 if name == ".." {
963                         node = node.Parent()
964                         continue
965                 }
966                 dn.RLock()
967                 node = dn.inodes[name]
968                 dn.RUnlock()
969         }
970         return
971 }
972
973 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
974         dirname, name := path.Split(name)
975         dn, ok := dn.lookupPath(dirname).(*dirnode)
976         if !ok {
977                 return nil, os.ErrNotExist
978         }
979         writeMode := flag&(os.O_RDWR|os.O_WRONLY|os.O_CREATE) != 0
980         if !writeMode {
981                 // A directory can be opened via "foo/", "foo/.", or
982                 // "foo/..".
983                 switch name {
984                 case ".", "":
985                         return &file{inode: dn}, nil
986                 case "..":
987                         return &file{inode: dn.Parent()}, nil
988                 }
989         }
990         createMode := flag&os.O_CREATE != 0
991         if createMode {
992                 dn.Lock()
993                 defer dn.Unlock()
994         } else {
995                 dn.RLock()
996                 defer dn.RUnlock()
997         }
998         n, ok := dn.inodes[name]
999         if !ok {
1000                 if !createMode {
1001                         return nil, os.ErrNotExist
1002                 }
1003                 if perm.IsDir() {
1004                         n = &dirnode{
1005                                 parent: dn,
1006                                 client: dn.client,
1007                                 kc:     dn.kc,
1008                                 fileinfo: fileinfo{
1009                                         name: name,
1010                                         mode: os.ModeDir | 0755,
1011                                 },
1012                         }
1013                 } else {
1014                         n = &filenode{
1015                                 parent: dn,
1016                                 fileinfo: fileinfo{
1017                                         name: name,
1018                                         mode: 0755,
1019                                 },
1020                         }
1021                 }
1022                 if dn.inodes == nil {
1023                         dn.inodes = make(map[string]inode)
1024                 }
1025                 dn.inodes[name] = n
1026                 dn.fileinfo.size++
1027         } else if flag&os.O_EXCL != 0 {
1028                 return nil, ErrFileExists
1029         }
1030         return &file{
1031                 inode:    n,
1032                 append:   flag&os.O_APPEND != 0,
1033                 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
1034         }, nil
1035 }
1036
1037 type extent interface {
1038         io.ReaderAt
1039         Len() int
1040         // Return a new extent with a subsection of the data from this
1041         // one. length<0 means length=Len()-off.
1042         Slice(off int, length int) extent
1043 }
1044
1045 type writableExtent interface {
1046         extent
1047         WriteAt(p []byte, off int)
1048         Truncate(n int)
1049 }
1050
1051 type memExtent struct {
1052         buf []byte
1053 }
1054
1055 func (me *memExtent) Len() int {
1056         return len(me.buf)
1057 }
1058
1059 func (me *memExtent) Slice(off, length int) extent {
1060         if length < 0 {
1061                 length = len(me.buf) - off
1062         }
1063         buf := make([]byte, length)
1064         copy(buf, me.buf[off:])
1065         return &memExtent{buf: buf}
1066 }
1067
1068 func (me *memExtent) Truncate(n int) {
1069         if n > cap(me.buf) {
1070                 newsize := 1024
1071                 for newsize < n {
1072                         newsize = newsize << 2
1073                 }
1074                 newbuf := make([]byte, n, newsize)
1075                 copy(newbuf, me.buf)
1076                 me.buf = newbuf
1077         } else {
1078                 // Zero unused part when shrinking, in case we grow
1079                 // and start using it again later.
1080                 for i := n; i < len(me.buf); i++ {
1081                         me.buf[i] = 0
1082                 }
1083         }
1084         me.buf = me.buf[:n]
1085 }
1086
1087 func (me *memExtent) WriteAt(p []byte, off int) {
1088         if off+len(p) > len(me.buf) {
1089                 panic("overflowed extent")
1090         }
1091         copy(me.buf[off:], p)
1092 }
1093
1094 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1095         if off > int64(me.Len()) {
1096                 err = io.EOF
1097                 return
1098         }
1099         n = copy(p, me.buf[int(off):])
1100         if n < len(p) {
1101                 err = io.EOF
1102         }
1103         return
1104 }
1105
1106 type storedExtent struct {
1107         kc      keepClient
1108         locator string
1109         size    int
1110         offset  int
1111         length  int
1112 }
1113
1114 func (se storedExtent) Len() int {
1115         return se.length
1116 }
1117
1118 func (se storedExtent) Slice(n, size int) extent {
1119         se.offset += n
1120         se.length -= n
1121         if size >= 0 && se.length > size {
1122                 se.length = size
1123         }
1124         return se
1125 }
1126
1127 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1128         if off > int64(se.length) {
1129                 return 0, io.EOF
1130         }
1131         maxlen := se.length - int(off)
1132         if len(p) > maxlen {
1133                 p = p[:maxlen]
1134                 n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1135                 if err == nil {
1136                         err = io.EOF
1137                 }
1138                 return
1139         }
1140         return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
1141 }
1142
1143 func canonicalName(name string) string {
1144         name = path.Clean("/" + name)
1145         if name == "/" || name == "./" {
1146                 name = "."
1147         } else if strings.HasPrefix(name, "/") {
1148                 name = "." + name
1149         }
1150         return name
1151 }
1152
1153 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1154
1155 func manifestUnescapeFunc(seq string) string {
1156         if seq == `\\` {
1157                 return `\`
1158         }
1159         i, err := strconv.ParseUint(seq[1:], 8, 8)
1160         if err != nil {
1161                 // Invalid escape sequence: can't unescape.
1162                 return seq
1163         }
1164         return string([]byte{byte(i)})
1165 }
1166
1167 func manifestUnescape(s string) string {
1168         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
1169 }
1170
1171 var manifestEscapedChar = regexp.MustCompile(`[^\.\w/]`)
1172
1173 func manifestEscapeFunc(seq string) string {
1174         return fmt.Sprintf("\\%03o", byte(seq[0]))
1175 }
1176
1177 func manifestEscape(s string) string {
1178         return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
1179 }