12483: Add Mkdir(), Remove().
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "crypto/md5"
9         "errors"
10         "fmt"
11         "io"
12         "net/http"
13         "os"
14         "path"
15         "regexp"
16         "sort"
17         "strconv"
18         "strings"
19         "sync"
20         "time"
21 )
22
23 var (
24         ErrReadOnlyFile      = errors.New("read-only file")
25         ErrNegativeOffset    = errors.New("cannot seek to negative offset")
26         ErrFileExists        = errors.New("file exists")
27         ErrInvalidOperation  = errors.New("invalid operation")
28         ErrDirectoryNotEmpty = errors.New("directory not empty")
29         ErrPermission        = os.ErrPermission
30
31         maxBlockSize = 1 << 26
32 )
33
34 type File interface {
35         io.Reader
36         io.Writer
37         io.Closer
38         io.Seeker
39         Size() int64
40         Readdir(int) ([]os.FileInfo, error)
41         Stat() (os.FileInfo, error)
42         Truncate(int64) error
43 }
44
45 type keepClient interface {
46         ReadAt(locator string, p []byte, off int) (int, error)
47 }
48
49 type fileinfo struct {
50         name    string
51         mode    os.FileMode
52         size    int64
53         modTime time.Time
54 }
55
56 // Name implements os.FileInfo.
57 func (fi fileinfo) Name() string {
58         return fi.name
59 }
60
61 // ModTime implements os.FileInfo.
62 func (fi fileinfo) ModTime() time.Time {
63         return fi.modTime
64 }
65
66 // Mode implements os.FileInfo.
67 func (fi fileinfo) Mode() os.FileMode {
68         return fi.mode
69 }
70
71 // IsDir implements os.FileInfo.
72 func (fi fileinfo) IsDir() bool {
73         return fi.mode&os.ModeDir != 0
74 }
75
76 // Size implements os.FileInfo.
77 func (fi fileinfo) Size() int64 {
78         return fi.size
79 }
80
81 // Sys implements os.FileInfo.
82 func (fi fileinfo) Sys() interface{} {
83         return nil
84 }
85
86 func (fi fileinfo) Stat() os.FileInfo {
87         return fi
88 }
89
90 // A CollectionFileSystem is an http.Filesystem plus Stat() and
91 // support for opening writable files.
92 type CollectionFileSystem interface {
93         http.FileSystem
94         Stat(name string) (os.FileInfo, error)
95         Create(name string) (File, error)
96         OpenFile(name string, flag int, perm os.FileMode) (File, error)
97         Mkdir(name string, perm os.FileMode) error
98         Remove(name string) error
99         MarshalManifest(string) (string, error)
100 }
101
102 type fileSystem struct {
103         dirnode
104 }
105
106 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
107         return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
108 }
109
110 func (fs *fileSystem) Open(name string) (http.File, error) {
111         return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
112 }
113
114 func (fs *fileSystem) Create(name string) (File, error) {
115         return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
116 }
117
118 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
119         f, err := fs.OpenFile(name, os.O_RDONLY, 0)
120         if err != nil {
121                 return nil, err
122         }
123         defer f.Close()
124         return f.Stat()
125 }
126
127 type inode interface {
128         os.FileInfo
129         OpenFile(string, int, os.FileMode) (*file, error)
130         Parent() inode
131         Read([]byte, filenodePtr) (int, filenodePtr, error)
132         Write([]byte, filenodePtr) (int, filenodePtr, error)
133         Truncate(int64) error
134         Readdir() []os.FileInfo
135         Stat() os.FileInfo
136         sync.Locker
137         RLock()
138         RUnlock()
139 }
140
141 // filenode implements inode.
142 type filenode struct {
143         fileinfo
144         parent   *dirnode
145         extents  []extent
146         repacked int64 // number of times anything in []extents has changed len
147         sync.RWMutex
148 }
149
150 // filenodePtr is an offset into a file that is (usually) efficient to
151 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
152 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
153 // corresponds to file offset filenodePtr.off. Otherwise, it is
154 // necessary to reexamine len(filenode.extents[0]) etc. to find the
155 // correct extent and offset.
156 type filenodePtr struct {
157         off       int64
158         extentIdx int
159         extentOff int
160         repacked  int64
161 }
162
163 // seek returns a ptr that is consistent with both startPtr.off and
164 // the current state of fn. The caller must already hold fn.RLock() or
165 // fn.Lock().
166 //
167 // If startPtr points beyond the end of the file, ptr will point to
168 // exactly the end of the file.
169 //
170 // After seeking:
171 //
172 //     ptr.extentIdx == len(filenode.extents) // i.e., at EOF
173 //     ||
174 //     filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
175 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
176         ptr = startPtr
177         if ptr.off < 0 {
178                 // meaningless anyway
179                 return
180         } else if ptr.off >= fn.fileinfo.size {
181                 ptr.off = fn.fileinfo.size
182                 ptr.extentIdx = len(fn.extents)
183                 ptr.extentOff = 0
184                 ptr.repacked = fn.repacked
185                 return
186         } else if ptr.repacked == fn.repacked {
187                 // extentIdx and extentOff accurately reflect ptr.off,
188                 // but might have fallen off the end of an extent
189                 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
190                         ptr.extentIdx++
191                         ptr.extentOff = 0
192                 }
193                 return
194         }
195         defer func() {
196                 ptr.repacked = fn.repacked
197         }()
198         if ptr.off >= fn.fileinfo.size {
199                 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
200                 return
201         }
202         // Recompute extentIdx and extentOff.  We have already
203         // established fn.fileinfo.size > ptr.off >= 0, so we don't
204         // have to deal with edge cases here.
205         var off int64
206         for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
207                 // This would panic (index out of range) if
208                 // fn.fileinfo.size were larger than
209                 // sum(fn.extents[i].Len()) -- but that can't happen
210                 // because we have ensured fn.fileinfo.size is always
211                 // accurate.
212                 extLen := int64(fn.extents[ptr.extentIdx].Len())
213                 if off+extLen > ptr.off {
214                         ptr.extentOff = int(ptr.off - off)
215                         break
216                 }
217                 off += extLen
218         }
219         return
220 }
221
222 func (fn *filenode) appendExtent(e extent) {
223         fn.Lock()
224         defer fn.Unlock()
225         fn.extents = append(fn.extents, e)
226         fn.fileinfo.size += int64(e.Len())
227 }
228
229 func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
230         return nil, os.ErrNotExist
231 }
232
233 func (fn *filenode) Parent() inode {
234         return fn.parent
235 }
236
237 func (fn *filenode) Readdir() []os.FileInfo {
238         return nil
239 }
240
241 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
242         fn.RLock()
243         defer fn.RUnlock()
244         ptr = fn.seek(startPtr)
245         if ptr.off < 0 {
246                 err = ErrNegativeOffset
247                 return
248         }
249         if ptr.extentIdx >= len(fn.extents) {
250                 err = io.EOF
251                 return
252         }
253         n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
254         if n > 0 {
255                 ptr.off += int64(n)
256                 ptr.extentOff += n
257                 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
258                         ptr.extentIdx++
259                         ptr.extentOff = 0
260                         if ptr.extentIdx < len(fn.extents) && err == io.EOF {
261                                 err = nil
262                         }
263                 }
264         }
265         return
266 }
267
268 func (fn *filenode) Truncate(size int64) error {
269         fn.Lock()
270         defer fn.Unlock()
271         if size < fn.fileinfo.size {
272                 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
273                 if ptr.extentOff == 0 {
274                         fn.extents = fn.extents[:ptr.extentIdx]
275                 } else {
276                         fn.extents = fn.extents[:ptr.extentIdx+1]
277                         e := fn.extents[ptr.extentIdx]
278                         if e, ok := e.(writableExtent); ok {
279                                 e.Truncate(ptr.extentOff)
280                         } else {
281                                 fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
282                         }
283                 }
284                 fn.fileinfo.size = size
285                 fn.repacked++
286                 return nil
287         }
288         for size > fn.fileinfo.size {
289                 grow := size - fn.fileinfo.size
290                 var e writableExtent
291                 var ok bool
292                 if len(fn.extents) == 0 {
293                         e = &memExtent{}
294                         fn.extents = append(fn.extents, e)
295                 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
296                         e = &memExtent{}
297                         fn.extents = append(fn.extents, e)
298                 } else {
299                         fn.repacked++
300                 }
301                 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
302                         grow = maxgrow
303                 }
304                 e.Truncate(e.Len() + int(grow))
305                 fn.fileinfo.size += grow
306         }
307         return nil
308 }
309
310 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
311         fn.Lock()
312         defer fn.Unlock()
313         ptr = fn.seek(startPtr)
314         if ptr.off < 0 {
315                 err = ErrNegativeOffset
316                 return
317         }
318         for len(p) > 0 && err == nil {
319                 cando := p
320                 if len(cando) > maxBlockSize {
321                         cando = cando[:maxBlockSize]
322                 }
323                 // Rearrange/grow fn.extents (and shrink cando if
324                 // needed) such that cando can be copied to
325                 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
326                 cur := ptr.extentIdx
327                 prev := ptr.extentIdx - 1
328                 var curWritable bool
329                 if cur < len(fn.extents) {
330                         _, curWritable = fn.extents[cur].(writableExtent)
331                 }
332                 var prevAppendable bool
333                 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
334                         _, prevAppendable = fn.extents[prev].(writableExtent)
335                 }
336                 if ptr.extentOff > 0 && !curWritable {
337                         // Split a non-writable block.
338                         if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
339                                 // Truncate cur, and insert a new
340                                 // extent after it.
341                                 cando = cando[:max]
342                                 fn.extents = append(fn.extents, nil)
343                                 copy(fn.extents[cur+1:], fn.extents[cur:])
344                         } else {
345                                 // Split cur into two copies, truncate
346                                 // the one on the left, shift the one
347                                 // on the right, and insert a new
348                                 // extent between them.
349                                 fn.extents = append(fn.extents, nil, nil)
350                                 copy(fn.extents[cur+2:], fn.extents[cur:])
351                                 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
352                         }
353                         cur++
354                         prev++
355                         e := &memExtent{}
356                         e.Truncate(len(cando))
357                         fn.extents[cur] = e
358                         fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
359                         ptr.extentIdx++
360                         ptr.extentOff = 0
361                         fn.repacked++
362                         ptr.repacked++
363                 } else if curWritable {
364                         if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
365                                 cando = cando[:fit]
366                         }
367                 } else {
368                         if prevAppendable {
369                                 // Shrink cando if needed to fit in prev extent.
370                                 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
371                                         cando = cando[:cangrow]
372                                 }
373                         }
374
375                         if cur == len(fn.extents) {
376                                 // ptr is at EOF, filesize is changing.
377                                 fn.fileinfo.size += int64(len(cando))
378                         } else if el := fn.extents[cur].Len(); el <= len(cando) {
379                                 // cando is long enough that we won't
380                                 // need cur any more. shrink cando to
381                                 // be exactly as long as cur
382                                 // (otherwise we'd accidentally shift
383                                 // the effective position of all
384                                 // extents after cur).
385                                 cando = cando[:el]
386                                 copy(fn.extents[cur:], fn.extents[cur+1:])
387                                 fn.extents = fn.extents[:len(fn.extents)-1]
388                         } else {
389                                 // shrink cur by the same #bytes we're growing prev
390                                 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
391                         }
392
393                         if prevAppendable {
394                                 // Grow prev.
395                                 ptr.extentIdx--
396                                 ptr.extentOff = fn.extents[prev].Len()
397                                 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
398                                 ptr.repacked++
399                                 fn.repacked++
400                         } else {
401                                 // Insert an extent between prev and cur, and advance prev/cur.
402                                 fn.extents = append(fn.extents, nil)
403                                 if cur < len(fn.extents) {
404                                         copy(fn.extents[cur+1:], fn.extents[cur:])
405                                         ptr.repacked++
406                                         fn.repacked++
407                                 } else {
408                                         // appending a new extent does
409                                         // not invalidate any ptrs
410                                 }
411                                 e := &memExtent{}
412                                 e.Truncate(len(cando))
413                                 fn.extents[cur] = e
414                                 cur++
415                                 prev++
416                         }
417                 }
418
419                 // Finally we can copy bytes from cando to the current extent.
420                 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
421                 n += len(cando)
422                 p = p[len(cando):]
423
424                 ptr.off += int64(len(cando))
425                 ptr.extentOff += len(cando)
426                 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
427                         ptr.extentOff = 0
428                         ptr.extentIdx++
429                 }
430         }
431         return
432 }
433
434 // FileSystem returns a CollectionFileSystem for the collection.
435 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
436         fs := &fileSystem{dirnode: dirnode{
437                 cache:    &keepBlockCache{kc: kc},
438                 client:   client,
439                 kc:       kc,
440                 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
441                 parent:   nil,
442                 inodes:   make(map[string]inode),
443         }}
444         fs.dirnode.parent = &fs.dirnode
445         fs.dirnode.loadManifest(c.ManifestText)
446         return fs
447 }
448
449 type file struct {
450         inode
451         ptr        filenodePtr
452         append     bool
453         writable   bool
454         unreaddirs []os.FileInfo
455 }
456
457 func (f *file) Read(p []byte) (n int, err error) {
458         n, f.ptr, err = f.inode.Read(p, f.ptr)
459         return
460 }
461
462 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
463         size := f.inode.Size()
464         ptr := f.ptr
465         switch whence {
466         case os.SEEK_SET:
467                 ptr.off = off
468         case os.SEEK_CUR:
469                 ptr.off += off
470         case os.SEEK_END:
471                 ptr.off = size + off
472         }
473         if ptr.off < 0 {
474                 return f.ptr.off, ErrNegativeOffset
475         }
476         if ptr.off > size {
477                 ptr.off = size
478         }
479         if ptr.off != f.ptr.off {
480                 f.ptr = ptr
481                 // force filenode to recompute f.ptr fields on next
482                 // use
483                 f.ptr.repacked = -1
484         }
485         return f.ptr.off, nil
486 }
487
488 func (f *file) Truncate(size int64) error {
489         return f.inode.Truncate(size)
490 }
491
492 func (f *file) Write(p []byte) (n int, err error) {
493         if !f.writable {
494                 return 0, ErrReadOnlyFile
495         }
496         n, f.ptr, err = f.inode.Write(p, f.ptr)
497         return
498 }
499
500 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
501         if !f.inode.IsDir() {
502                 return nil, ErrInvalidOperation
503         }
504         if count <= 0 {
505                 return f.inode.Readdir(), nil
506         }
507         if f.unreaddirs == nil {
508                 f.unreaddirs = f.inode.Readdir()
509         }
510         if len(f.unreaddirs) == 0 {
511                 return nil, io.EOF
512         }
513         if count > len(f.unreaddirs) {
514                 count = len(f.unreaddirs)
515         }
516         ret := f.unreaddirs[:count]
517         f.unreaddirs = f.unreaddirs[count:]
518         return ret, nil
519 }
520
521 func (f *file) Stat() (os.FileInfo, error) {
522         return f.inode, nil
523 }
524
525 func (f *file) Close() error {
526         // FIXME: flush
527         return nil
528 }
529
530 func (f *file) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
531         return f.inode.OpenFile(name, flag, perm)
532 }
533
534 type dirnode struct {
535         fileinfo
536         parent *dirnode
537         client *Client
538         kc     keepClient
539         cache  blockCache
540         inodes map[string]inode
541         sync.RWMutex
542 }
543
544 // caller must hold dn.Lock().
545 func (dn *dirnode) sync() error {
546         type shortBlock struct {
547                 fn  *filenode
548                 idx int
549         }
550         var pending []shortBlock
551         var pendingLen int
552
553         flush := func(sbs []shortBlock) error {
554                 if len(sbs) == 0 {
555                         return nil
556                 }
557                 hash := md5.New()
558                 size := 0
559                 for _, sb := range sbs {
560                         data := sb.fn.extents[sb.idx].(*memExtent).buf
561                         if _, err := hash.Write(data); err != nil {
562                                 return err
563                         }
564                         size += len(data)
565                 }
566                 // FIXME: write to keep
567                 locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
568                 off := 0
569                 for _, sb := range sbs {
570                         data := sb.fn.extents[sb.idx].(*memExtent).buf
571                         sb.fn.extents[sb.idx] = storedExtent{
572                                 cache:   dn.cache,
573                                 locator: locator,
574                                 size:    size,
575                                 offset:  off,
576                                 length:  len(data),
577                         }
578                         off += len(data)
579                 }
580                 return nil
581         }
582
583         names := make([]string, 0, len(dn.inodes))
584         for name := range dn.inodes {
585                 names = append(names, name)
586         }
587         sort.Strings(names)
588
589         for _, name := range names {
590                 fn, ok := dn.inodes[name].(*filenode)
591                 if !ok {
592                         continue
593                 }
594                 fn.Lock()
595                 defer fn.Unlock()
596                 for idx, ext := range fn.extents {
597                         ext, ok := ext.(*memExtent)
598                         if !ok {
599                                 continue
600                         }
601                         if ext.Len() > maxBlockSize/2 {
602                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
603                                         return err
604                                 }
605                                 continue
606                         }
607                         if pendingLen+ext.Len() > maxBlockSize {
608                                 if err := flush(pending); err != nil {
609                                         return err
610                                 }
611                                 pending = nil
612                                 pendingLen = 0
613                         }
614                         pending = append(pending, shortBlock{fn, idx})
615                         pendingLen += ext.Len()
616                 }
617         }
618         return flush(pending)
619 }
620
621 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
622         dn.Lock()
623         defer dn.Unlock()
624         if err := dn.sync(); err != nil {
625                 return "", err
626         }
627
628         var streamLen int64
629         type m1segment struct {
630                 name   string
631                 offset int64
632                 length int64
633         }
634         var segments []m1segment
635         var subdirs string
636         var blocks []string
637
638         names := make([]string, 0, len(dn.inodes))
639         for name := range dn.inodes {
640                 names = append(names, name)
641         }
642         sort.Strings(names)
643
644         for _, name := range names {
645                 node := dn.inodes[name]
646                 switch node := node.(type) {
647                 case *dirnode:
648                         subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
649                         if err != nil {
650                                 return "", err
651                         }
652                         subdirs = subdirs + subdir
653                 case *filenode:
654                         for _, e := range node.extents {
655                                 switch e := e.(type) {
656                                 case *memExtent:
657                                         blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
658                                         segments = append(segments, m1segment{
659                                                 name:   node.Name(),
660                                                 offset: streamLen,
661                                                 length: int64(e.Len()),
662                                         })
663                                         streamLen += int64(e.Len())
664                                 case storedExtent:
665                                         if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
666                                                 streamLen -= int64(e.size)
667                                         } else {
668                                                 blocks = append(blocks, e.locator)
669                                         }
670                                         segments = append(segments, m1segment{
671                                                 name:   node.Name(),
672                                                 offset: streamLen + int64(e.offset),
673                                                 length: int64(e.length),
674                                         })
675                                         streamLen += int64(e.size)
676                                 default:
677                                         panic(fmt.Sprintf("can't marshal extent type %T", e))
678                                 }
679                         }
680                 default:
681                         panic(fmt.Sprintf("can't marshal inode type %T", node))
682                 }
683         }
684         var filetokens []string
685         for _, s := range segments {
686                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
687         }
688         if len(filetokens) == 0 {
689                 return subdirs, nil
690         } else if len(blocks) == 0 {
691                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
692         }
693         return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
694 }
695
696 func (dn *dirnode) loadManifest(txt string) {
697         // FIXME: faster
698         var dirname string
699         for _, stream := range strings.Split(txt, "\n") {
700                 var extents []storedExtent
701                 for i, token := range strings.Split(stream, " ") {
702                         if i == 0 {
703                                 dirname = manifestUnescape(token)
704                                 continue
705                         }
706                         if !strings.Contains(token, ":") {
707                                 toks := strings.SplitN(token, "+", 3)
708                                 if len(toks) < 2 {
709                                         // FIXME: broken
710                                         continue
711                                 }
712                                 length, err := strconv.ParseInt(toks[1], 10, 32)
713                                 if err != nil || length < 0 {
714                                         // FIXME: broken
715                                         continue
716                                 }
717                                 extents = append(extents, storedExtent{
718                                         locator: token,
719                                         size:    int(length),
720                                         offset:  0,
721                                         length:  int(length),
722                                 })
723                                 continue
724                         }
725                         toks := strings.Split(token, ":")
726                         if len(toks) != 3 {
727                                 // FIXME: broken manifest
728                                 continue
729                         }
730                         offset, err := strconv.ParseInt(toks[0], 10, 64)
731                         if err != nil || offset < 0 {
732                                 // FIXME: broken manifest
733                                 continue
734                         }
735                         length, err := strconv.ParseInt(toks[1], 10, 64)
736                         if err != nil || length < 0 {
737                                 // FIXME: broken manifest
738                                 continue
739                         }
740                         name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
741                         dn.makeParentDirs(name)
742                         f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
743                         if err != nil {
744                                 // FIXME: broken
745                                 continue
746                         }
747                         if f.inode.Stat().IsDir() {
748                                 f.Close()
749                                 // FIXME: broken manifest
750                                 continue
751                         }
752                         // Map the stream offset/range coordinates to
753                         // block/offset/range coordinates and add
754                         // corresponding storedExtents to the filenode
755                         var pos int64
756                         for _, e := range extents {
757                                 next := pos + int64(e.Len())
758                                 if next < offset {
759                                         pos = next
760                                         continue
761                                 }
762                                 if pos > offset+length {
763                                         break
764                                 }
765                                 var blkOff int
766                                 if pos < offset {
767                                         blkOff = int(offset - pos)
768                                 }
769                                 blkLen := e.Len() - blkOff
770                                 if pos+int64(blkOff+blkLen) > offset+length {
771                                         blkLen = int(offset + length - pos - int64(blkOff))
772                                 }
773                                 f.inode.(*filenode).appendExtent(storedExtent{
774                                         cache:   dn.cache,
775                                         locator: e.locator,
776                                         size:    e.size,
777                                         offset:  blkOff,
778                                         length:  blkLen,
779                                 })
780                                 pos = next
781                         }
782                         f.Close()
783                 }
784         }
785 }
786
787 func (dn *dirnode) makeParentDirs(name string) (err error) {
788         names := strings.Split(name, "/")
789         for _, name := range names[:len(names)-1] {
790                 f, err := dn.mkdir(name)
791                 if err != nil {
792                         return err
793                 }
794                 defer f.Close()
795                 var ok bool
796                 dn, ok = f.inode.(*dirnode)
797                 if !ok {
798                         return ErrFileExists
799                 }
800         }
801         return nil
802 }
803
804 func (dn *dirnode) mkdir(name string) (*file, error) {
805         return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
806 }
807
808 func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
809         f, err := dn.mkdir(name)
810         if err != nil {
811                 f.Close()
812         }
813         return err
814 }
815
816 func (dn *dirnode) Remove(name string) error {
817         dirname, name := path.Split(name)
818         if name == "" || name == "." || name == ".." {
819                 return ErrInvalidOperation
820         }
821         dn, ok := dn.lookupPath(dirname).(*dirnode)
822         if !ok {
823                 return os.ErrNotExist
824         }
825         dn.Lock()
826         defer dn.Unlock()
827         switch node := dn.inodes[name].(type) {
828         case nil:
829                 return os.ErrNotExist
830         case *dirnode:
831                 node.RLock()
832                 defer node.RUnlock()
833                 if len(node.inodes) > 0 {
834                         return ErrDirectoryNotEmpty
835                 }
836         }
837         delete(dn.inodes, name)
838         return nil
839 }
840
841 func (dn *dirnode) Parent() inode {
842         dn.RLock()
843         defer dn.RUnlock()
844         return dn.parent
845 }
846
847 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
848         dn.RLock()
849         defer dn.RUnlock()
850         fi = make([]os.FileInfo, 0, len(dn.inodes))
851         for _, inode := range dn.inodes {
852                 fi = append(fi, inode.Stat())
853         }
854         return
855 }
856
857 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
858         return 0, ptr, ErrInvalidOperation
859 }
860
861 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
862         return 0, ptr, ErrInvalidOperation
863 }
864
865 func (dn *dirnode) Truncate(int64) error {
866         return ErrInvalidOperation
867 }
868
869 // lookupPath returns the inode for the file/directory with the given
870 // name (which may contain "/" separators), along with its parent
871 // node. If no such file/directory exists, the returned node is nil.
872 func (dn *dirnode) lookupPath(path string) (node inode) {
873         node = dn
874         for _, name := range strings.Split(path, "/") {
875                 dn, ok := node.(*dirnode)
876                 if !ok {
877                         return nil
878                 }
879                 if name == "." || name == "" {
880                         continue
881                 }
882                 if name == ".." {
883                         node = node.Parent()
884                         continue
885                 }
886                 dn.RLock()
887                 node = dn.inodes[name]
888                 dn.RUnlock()
889         }
890         return
891 }
892
893 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
894         dirname, name := path.Split(name)
895         dn, ok := dn.lookupPath(dirname).(*dirnode)
896         if !ok {
897                 return nil, os.ErrNotExist
898         }
899         writeMode := flag&(os.O_RDWR|os.O_WRONLY|os.O_CREATE) != 0
900         if !writeMode {
901                 // A directory can be opened via "foo/", "foo/.", or
902                 // "foo/..".
903                 switch name {
904                 case ".", "":
905                         return &file{inode: dn}, nil
906                 case "..":
907                         return &file{inode: dn.Parent()}, nil
908                 }
909         }
910         createMode := flag&os.O_CREATE != 0
911         if createMode {
912                 dn.Lock()
913                 defer dn.Unlock()
914         } else {
915                 dn.RLock()
916                 defer dn.RUnlock()
917         }
918         n, ok := dn.inodes[name]
919         if !ok {
920                 if !createMode {
921                         return nil, os.ErrNotExist
922                 }
923                 if perm.IsDir() {
924                         n = &dirnode{
925                                 parent: dn,
926                                 client: dn.client,
927                                 kc:     dn.kc,
928                                 fileinfo: fileinfo{
929                                         name: name,
930                                         mode: os.ModeDir | 0755,
931                                 },
932                         }
933                 } else {
934                         n = &filenode{
935                                 parent: dn,
936                                 fileinfo: fileinfo{
937                                         name: name,
938                                         mode: 0755,
939                                 },
940                         }
941                 }
942                 if dn.inodes == nil {
943                         dn.inodes = make(map[string]inode)
944                 }
945                 dn.inodes[name] = n
946                 dn.fileinfo.size++
947         } else if flag&os.O_EXCL != 0 {
948                 return nil, ErrFileExists
949         }
950         return &file{
951                 inode:    n,
952                 append:   flag&os.O_APPEND != 0,
953                 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
954         }, nil
955 }
956
957 type extent interface {
958         io.ReaderAt
959         Len() int
960         // Return a new extent with a subsection of the data from this
961         // one. length<0 means length=Len()-off.
962         Slice(off int, length int) extent
963 }
964
965 type writableExtent interface {
966         extent
967         WriteAt(p []byte, off int)
968         Truncate(n int)
969 }
970
971 type memExtent struct {
972         buf []byte
973 }
974
975 func (me *memExtent) Len() int {
976         return len(me.buf)
977 }
978
979 func (me *memExtent) Slice(off, length int) extent {
980         if length < 0 {
981                 length = len(me.buf) - off
982         }
983         buf := make([]byte, length)
984         copy(buf, me.buf[off:])
985         return &memExtent{buf: buf}
986 }
987
988 func (me *memExtent) Truncate(n int) {
989         if n > cap(me.buf) {
990                 newsize := 1024
991                 for newsize < n {
992                         newsize = newsize << 2
993                 }
994                 newbuf := make([]byte, n, newsize)
995                 copy(newbuf, me.buf)
996                 me.buf = newbuf
997         } else {
998                 // Zero unused part when shrinking, in case we grow
999                 // and start using it again later.
1000                 for i := n; i < len(me.buf); i++ {
1001                         me.buf[i] = 0
1002                 }
1003         }
1004         me.buf = me.buf[:n]
1005 }
1006
1007 func (me *memExtent) WriteAt(p []byte, off int) {
1008         if off+len(p) > len(me.buf) {
1009                 panic("overflowed extent")
1010         }
1011         copy(me.buf[off:], p)
1012 }
1013
1014 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
1015         if off > int64(me.Len()) {
1016                 err = io.EOF
1017                 return
1018         }
1019         n = copy(p, me.buf[int(off):])
1020         if n < len(p) {
1021                 err = io.EOF
1022         }
1023         return
1024 }
1025
1026 type storedExtent struct {
1027         cache   blockCache
1028         locator string
1029         size    int
1030         offset  int
1031         length  int
1032 }
1033
1034 func (se storedExtent) Len() int {
1035         return se.length
1036 }
1037
1038 func (se storedExtent) Slice(n, size int) extent {
1039         se.offset += n
1040         se.length -= n
1041         if size >= 0 && se.length > size {
1042                 se.length = size
1043         }
1044         return se
1045 }
1046
1047 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
1048         if off > int64(se.length) {
1049                 return 0, io.EOF
1050         }
1051         maxlen := se.length - int(off)
1052         if len(p) > maxlen {
1053                 p = p[:maxlen]
1054                 n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
1055                 if err == nil {
1056                         err = io.EOF
1057                 }
1058                 return
1059         }
1060         return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
1061 }
1062
1063 type blockCache interface {
1064         ReadAt(locator string, p []byte, off int) (n int, err error)
1065 }
1066
1067 type keepBlockCache struct {
1068         kc keepClient
1069 }
1070
1071 var scratch = make([]byte, 2<<26)
1072
1073 func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
1074         return kbc.kc.ReadAt(locator, p, off)
1075 }
1076
1077 func canonicalName(name string) string {
1078         name = path.Clean("/" + name)
1079         if name == "/" || name == "./" {
1080                 name = "."
1081         } else if strings.HasPrefix(name, "/") {
1082                 name = "." + name
1083         }
1084         return name
1085 }
1086
1087 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1088
1089 func manifestUnescapeSeq(seq string) string {
1090         if seq == `\\` {
1091                 return `\`
1092         }
1093         i, err := strconv.ParseUint(seq[1:], 8, 8)
1094         if err != nil {
1095                 // Invalid escape sequence: can't unescape.
1096                 return seq
1097         }
1098         return string([]byte{byte(i)})
1099 }
1100
1101 func manifestUnescape(s string) string {
1102         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)
1103 }