12483: Add MarshalManifest().
[arvados.git] / sdk / go / arvados / collection_fs.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0
4
5 package arvados
6
7 import (
8         "crypto/md5"
9         "errors"
10         "fmt"
11         "io"
12         "net/http"
13         "os"
14         "path"
15         "regexp"
16         "sort"
17         "strconv"
18         "strings"
19         "sync"
20         "time"
21 )
22
23 var (
24         ErrReadOnlyFile     = errors.New("read-only file")
25         ErrNegativeOffset   = errors.New("cannot seek to negative offset")
26         ErrFileExists       = errors.New("file exists")
27         ErrInvalidOperation = errors.New("invalid operation")
28         ErrPermission       = os.ErrPermission
29
30         maxBlockSize = 1 << 26
31 )
32
33 type File interface {
34         io.Reader
35         io.Writer
36         io.Closer
37         io.Seeker
38         Size() int64
39         Readdir(int) ([]os.FileInfo, error)
40         Stat() (os.FileInfo, error)
41         Truncate(int64) error
42 }
43
44 type keepClient interface {
45         ReadAt(locator string, p []byte, off int) (int, error)
46 }
47
48 type fileinfo struct {
49         name    string
50         mode    os.FileMode
51         size    int64
52         modTime time.Time
53 }
54
55 // Name implements os.FileInfo.
56 func (fi fileinfo) Name() string {
57         return fi.name
58 }
59
60 // ModTime implements os.FileInfo.
61 func (fi fileinfo) ModTime() time.Time {
62         return fi.modTime
63 }
64
65 // Mode implements os.FileInfo.
66 func (fi fileinfo) Mode() os.FileMode {
67         return fi.mode
68 }
69
70 // IsDir implements os.FileInfo.
71 func (fi fileinfo) IsDir() bool {
72         return fi.mode&os.ModeDir != 0
73 }
74
75 // Size implements os.FileInfo.
76 func (fi fileinfo) Size() int64 {
77         return fi.size
78 }
79
80 // Sys implements os.FileInfo.
81 func (fi fileinfo) Sys() interface{} {
82         return nil
83 }
84
85 func (fi fileinfo) Stat() os.FileInfo {
86         return fi
87 }
88
89 // A CollectionFileSystem is an http.Filesystem plus Stat() and
90 // support for opening writable files.
91 type CollectionFileSystem interface {
92         http.FileSystem
93         Stat(name string) (os.FileInfo, error)
94         Create(name string) (File, error)
95         OpenFile(name string, flag int, perm os.FileMode) (File, error)
96         MarshalManifest(string) (string, error)
97 }
98
99 type fileSystem struct {
100         dirnode
101 }
102
103 func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
104         return fs.dirnode.OpenFile(path.Clean(name), flag, perm)
105 }
106
107 func (fs *fileSystem) Open(name string) (http.File, error) {
108         return fs.dirnode.OpenFile(path.Clean(name), os.O_RDONLY, 0)
109 }
110
111 func (fs *fileSystem) Create(name string) (File, error) {
112         return fs.dirnode.OpenFile(path.Clean(name), os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
113 }
114
115 func (fs *fileSystem) Stat(name string) (os.FileInfo, error) {
116         f, err := fs.OpenFile(name, os.O_RDONLY, 0)
117         if err != nil {
118                 return nil, err
119         }
120         defer f.Close()
121         return f.Stat()
122 }
123
124 type inode interface {
125         os.FileInfo
126         OpenFile(string, int, os.FileMode) (*file, error)
127         Parent() inode
128         Read([]byte, filenodePtr) (int, filenodePtr, error)
129         Write([]byte, filenodePtr) (int, filenodePtr, error)
130         Truncate(int64) error
131         Readdir() []os.FileInfo
132         Stat() os.FileInfo
133         sync.Locker
134         RLock()
135         RUnlock()
136 }
137
138 // filenode implements inode.
139 type filenode struct {
140         fileinfo
141         parent   *dirnode
142         extents  []extent
143         repacked int64 // number of times anything in []extents has changed len
144         sync.RWMutex
145 }
146
147 // filenodePtr is an offset into a file that is (usually) efficient to
148 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
149 // then filenode.extents[filenodePtr.extentIdx][filenodePtr.extentOff]
150 // corresponds to file offset filenodePtr.off. Otherwise, it is
151 // necessary to reexamine len(filenode.extents[0]) etc. to find the
152 // correct extent and offset.
153 type filenodePtr struct {
154         off       int64
155         extentIdx int
156         extentOff int
157         repacked  int64
158 }
159
160 // seek returns a ptr that is consistent with both startPtr.off and
161 // the current state of fn. The caller must already hold fn.RLock() or
162 // fn.Lock().
163 //
164 // If startPtr points beyond the end of the file, ptr will point to
165 // exactly the end of the file.
166 //
167 // After seeking:
168 //
169 //     ptr.extentIdx == len(filenode.extents) // i.e., at EOF
170 //     ||
171 //     filenode.extents[ptr.extentIdx].Len() >= ptr.extentOff
172 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
173         ptr = startPtr
174         if ptr.off < 0 {
175                 // meaningless anyway
176                 return
177         } else if ptr.off >= fn.fileinfo.size {
178                 ptr.off = fn.fileinfo.size
179                 ptr.extentIdx = len(fn.extents)
180                 ptr.extentOff = 0
181                 ptr.repacked = fn.repacked
182                 return
183         } else if ptr.repacked == fn.repacked {
184                 // extentIdx and extentOff accurately reflect ptr.off,
185                 // but might have fallen off the end of an extent
186                 if ptr.extentOff >= fn.extents[ptr.extentIdx].Len() {
187                         ptr.extentIdx++
188                         ptr.extentOff = 0
189                 }
190                 return
191         }
192         defer func() {
193                 ptr.repacked = fn.repacked
194         }()
195         if ptr.off >= fn.fileinfo.size {
196                 ptr.extentIdx, ptr.extentOff = len(fn.extents), 0
197                 return
198         }
199         // Recompute extentIdx and extentOff.  We have already
200         // established fn.fileinfo.size > ptr.off >= 0, so we don't
201         // have to deal with edge cases here.
202         var off int64
203         for ptr.extentIdx, ptr.extentOff = 0, 0; off < ptr.off; ptr.extentIdx++ {
204                 // This would panic (index out of range) if
205                 // fn.fileinfo.size were larger than
206                 // sum(fn.extents[i].Len()) -- but that can't happen
207                 // because we have ensured fn.fileinfo.size is always
208                 // accurate.
209                 extLen := int64(fn.extents[ptr.extentIdx].Len())
210                 if off+extLen > ptr.off {
211                         ptr.extentOff = int(ptr.off - off)
212                         break
213                 }
214                 off += extLen
215         }
216         return
217 }
218
219 func (fn *filenode) appendExtent(e extent) {
220         fn.Lock()
221         defer fn.Unlock()
222         fn.extents = append(fn.extents, e)
223         fn.fileinfo.size += int64(e.Len())
224 }
225
226 func (fn *filenode) OpenFile(string, int, os.FileMode) (*file, error) {
227         return nil, os.ErrNotExist
228 }
229
230 func (fn *filenode) Parent() inode {
231         return fn.parent
232 }
233
234 func (fn *filenode) Readdir() []os.FileInfo {
235         return nil
236 }
237
238 func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
239         fn.RLock()
240         defer fn.RUnlock()
241         ptr = fn.seek(startPtr)
242         if ptr.off < 0 {
243                 err = ErrNegativeOffset
244                 return
245         }
246         if ptr.extentIdx >= len(fn.extents) {
247                 err = io.EOF
248                 return
249         }
250         n, err = fn.extents[ptr.extentIdx].ReadAt(p, int64(ptr.extentOff))
251         if n > 0 {
252                 ptr.off += int64(n)
253                 ptr.extentOff += n
254                 if ptr.extentOff == fn.extents[ptr.extentIdx].Len() {
255                         ptr.extentIdx++
256                         ptr.extentOff = 0
257                         if ptr.extentIdx < len(fn.extents) && err == io.EOF {
258                                 err = nil
259                         }
260                 }
261         }
262         return
263 }
264
265 func (fn *filenode) Truncate(size int64) error {
266         fn.Lock()
267         defer fn.Unlock()
268         if size < fn.fileinfo.size {
269                 ptr := fn.seek(filenodePtr{off: size, repacked: fn.repacked - 1})
270                 if ptr.extentOff == 0 {
271                         fn.extents = fn.extents[:ptr.extentIdx]
272                 } else {
273                         fn.extents = fn.extents[:ptr.extentIdx+1]
274                         e := fn.extents[ptr.extentIdx]
275                         if e, ok := e.(writableExtent); ok {
276                                 e.Truncate(ptr.extentOff)
277                         } else {
278                                 fn.extents[ptr.extentIdx] = e.Slice(0, ptr.extentOff)
279                         }
280                 }
281                 fn.fileinfo.size = size
282                 fn.repacked++
283                 return nil
284         }
285         for size > fn.fileinfo.size {
286                 grow := size - fn.fileinfo.size
287                 var e writableExtent
288                 var ok bool
289                 if len(fn.extents) == 0 {
290                         e = &memExtent{}
291                         fn.extents = append(fn.extents, e)
292                 } else if e, ok = fn.extents[len(fn.extents)-1].(writableExtent); !ok || e.Len() >= maxBlockSize {
293                         e = &memExtent{}
294                         fn.extents = append(fn.extents, e)
295                 } else {
296                         fn.repacked++
297                 }
298                 if maxgrow := int64(maxBlockSize - e.Len()); maxgrow < grow {
299                         grow = maxgrow
300                 }
301                 e.Truncate(e.Len() + int(grow))
302                 fn.fileinfo.size += grow
303         }
304         return nil
305 }
306
307 func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
308         fn.Lock()
309         defer fn.Unlock()
310         ptr = fn.seek(startPtr)
311         if ptr.off < 0 {
312                 err = ErrNegativeOffset
313                 return
314         }
315         for len(p) > 0 && err == nil {
316                 cando := p
317                 if len(cando) > maxBlockSize {
318                         cando = cando[:maxBlockSize]
319                 }
320                 // Rearrange/grow fn.extents (and shrink cando if
321                 // needed) such that cando can be copied to
322                 // fn.extents[ptr.extentIdx] at offset ptr.extentOff.
323                 cur := ptr.extentIdx
324                 prev := ptr.extentIdx - 1
325                 var curWritable bool
326                 if cur < len(fn.extents) {
327                         _, curWritable = fn.extents[cur].(writableExtent)
328                 }
329                 var prevAppendable bool
330                 if prev >= 0 && fn.extents[prev].Len() < maxBlockSize {
331                         _, prevAppendable = fn.extents[prev].(writableExtent)
332                 }
333                 if ptr.extentOff > 0 && !curWritable {
334                         // Split a non-writable block.
335                         if max := fn.extents[cur].Len() - ptr.extentOff; max <= len(cando) {
336                                 // Truncate cur, and insert a new
337                                 // extent after it.
338                                 cando = cando[:max]
339                                 fn.extents = append(fn.extents, nil)
340                                 copy(fn.extents[cur+1:], fn.extents[cur:])
341                         } else {
342                                 // Split cur into two copies, truncate
343                                 // the one on the left, shift the one
344                                 // on the right, and insert a new
345                                 // extent between them.
346                                 fn.extents = append(fn.extents, nil, nil)
347                                 copy(fn.extents[cur+2:], fn.extents[cur:])
348                                 fn.extents[cur+2] = fn.extents[cur+2].Slice(ptr.extentOff+len(cando), -1)
349                         }
350                         cur++
351                         prev++
352                         e := &memExtent{}
353                         e.Truncate(len(cando))
354                         fn.extents[cur] = e
355                         fn.extents[prev] = fn.extents[prev].Slice(0, ptr.extentOff)
356                         ptr.extentIdx++
357                         ptr.extentOff = 0
358                         fn.repacked++
359                         ptr.repacked++
360                 } else if curWritable {
361                         if fit := int(fn.extents[cur].Len()) - ptr.extentOff; fit < len(cando) {
362                                 cando = cando[:fit]
363                         }
364                 } else {
365                         if prevAppendable {
366                                 // Shrink cando if needed to fit in prev extent.
367                                 if cangrow := maxBlockSize - fn.extents[prev].Len(); cangrow < len(cando) {
368                                         cando = cando[:cangrow]
369                                 }
370                         }
371
372                         if cur == len(fn.extents) {
373                                 // ptr is at EOF, filesize is changing.
374                                 fn.fileinfo.size += int64(len(cando))
375                         } else if el := fn.extents[cur].Len(); el <= len(cando) {
376                                 // cando is long enough that we won't
377                                 // need cur any more. shrink cando to
378                                 // be exactly as long as cur
379                                 // (otherwise we'd accidentally shift
380                                 // the effective position of all
381                                 // extents after cur).
382                                 cando = cando[:el]
383                                 copy(fn.extents[cur:], fn.extents[cur+1:])
384                                 fn.extents = fn.extents[:len(fn.extents)-1]
385                         } else {
386                                 // shrink cur by the same #bytes we're growing prev
387                                 fn.extents[cur] = fn.extents[cur].Slice(len(cando), -1)
388                         }
389
390                         if prevAppendable {
391                                 // Grow prev.
392                                 ptr.extentIdx--
393                                 ptr.extentOff = fn.extents[prev].Len()
394                                 fn.extents[prev].(writableExtent).Truncate(ptr.extentOff + len(cando))
395                                 ptr.repacked++
396                                 fn.repacked++
397                         } else {
398                                 // Insert an extent between prev and cur, and advance prev/cur.
399                                 fn.extents = append(fn.extents, nil)
400                                 if cur < len(fn.extents) {
401                                         copy(fn.extents[cur+1:], fn.extents[cur:])
402                                         ptr.repacked++
403                                         fn.repacked++
404                                 } else {
405                                         // appending a new extent does
406                                         // not invalidate any ptrs
407                                 }
408                                 e := &memExtent{}
409                                 e.Truncate(len(cando))
410                                 fn.extents[cur] = e
411                                 cur++
412                                 prev++
413                         }
414                 }
415
416                 // Finally we can copy bytes from cando to the current extent.
417                 fn.extents[ptr.extentIdx].(writableExtent).WriteAt(cando, ptr.extentOff)
418                 n += len(cando)
419                 p = p[len(cando):]
420
421                 ptr.off += int64(len(cando))
422                 ptr.extentOff += len(cando)
423                 if fn.extents[ptr.extentIdx].Len() == ptr.extentOff {
424                         ptr.extentOff = 0
425                         ptr.extentIdx++
426                 }
427         }
428         return
429 }
430
431 // FileSystem returns a CollectionFileSystem for the collection.
432 func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
433         fs := &fileSystem{dirnode: dirnode{
434                 cache:    &keepBlockCache{kc: kc},
435                 client:   client,
436                 kc:       kc,
437                 fileinfo: fileinfo{name: ".", mode: os.ModeDir | 0755},
438                 parent:   nil,
439                 inodes:   make(map[string]inode),
440         }}
441         fs.dirnode.parent = &fs.dirnode
442         fs.dirnode.loadManifest(c.ManifestText)
443         return fs
444 }
445
446 type file struct {
447         inode
448         ptr        filenodePtr
449         append     bool
450         writable   bool
451         unreaddirs []os.FileInfo
452 }
453
454 func (f *file) Read(p []byte) (n int, err error) {
455         n, f.ptr, err = f.inode.Read(p, f.ptr)
456         return
457 }
458
459 func (f *file) Seek(off int64, whence int) (pos int64, err error) {
460         size := f.inode.Size()
461         ptr := f.ptr
462         switch whence {
463         case os.SEEK_SET:
464                 ptr.off = off
465         case os.SEEK_CUR:
466                 ptr.off += off
467         case os.SEEK_END:
468                 ptr.off = size + off
469         }
470         if ptr.off < 0 {
471                 return f.ptr.off, ErrNegativeOffset
472         }
473         if ptr.off > size {
474                 ptr.off = size
475         }
476         if ptr.off != f.ptr.off {
477                 f.ptr = ptr
478                 // force filenode to recompute f.ptr fields on next
479                 // use
480                 f.ptr.repacked = -1
481         }
482         return f.ptr.off, nil
483 }
484
485 func (f *file) Truncate(size int64) error {
486         return f.inode.Truncate(size)
487 }
488
489 func (f *file) Write(p []byte) (n int, err error) {
490         if !f.writable {
491                 return 0, ErrReadOnlyFile
492         }
493         n, f.ptr, err = f.inode.Write(p, f.ptr)
494         return
495 }
496
497 func (f *file) Readdir(count int) ([]os.FileInfo, error) {
498         if !f.inode.IsDir() {
499                 return nil, ErrInvalidOperation
500         }
501         if count <= 0 {
502                 return f.inode.Readdir(), nil
503         }
504         if f.unreaddirs == nil {
505                 f.unreaddirs = f.inode.Readdir()
506         }
507         if len(f.unreaddirs) == 0 {
508                 return nil, io.EOF
509         }
510         if count > len(f.unreaddirs) {
511                 count = len(f.unreaddirs)
512         }
513         ret := f.unreaddirs[:count]
514         f.unreaddirs = f.unreaddirs[count:]
515         return ret, nil
516 }
517
518 func (f *file) Stat() (os.FileInfo, error) {
519         return f.inode, nil
520 }
521
522 func (f *file) Close() error {
523         // FIXME: flush
524         return nil
525 }
526
527 func (f *file) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
528         return f.inode.OpenFile(name, flag, perm)
529 }
530
531 type dirnode struct {
532         fileinfo
533         parent *dirnode
534         client *Client
535         kc     keepClient
536         cache  blockCache
537         inodes map[string]inode
538         sync.RWMutex
539 }
540
541 // caller must hold dn.Lock().
542 func (dn *dirnode) sync() error {
543         type shortBlock struct {
544                 fn  *filenode
545                 idx int
546         }
547         var pending []shortBlock
548         var pendingLen int
549
550         flush := func(sbs []shortBlock) error {
551                 if len(sbs) == 0 {
552                         return nil
553                 }
554                 hash := md5.New()
555                 size := 0
556                 for _, sb := range sbs {
557                         data := sb.fn.extents[sb.idx].(*memExtent).buf
558                         if _, err := hash.Write(data); err != nil {
559                                 return err
560                         }
561                         size += len(data)
562                 }
563                 // FIXME: write to keep
564                 locator := fmt.Sprintf("%x+%d", hash.Sum(nil), size)
565                 off := 0
566                 for _, sb := range sbs {
567                         data := sb.fn.extents[sb.idx].(*memExtent).buf
568                         sb.fn.extents[sb.idx] = storedExtent{
569                                 cache:   dn.cache,
570                                 locator: locator,
571                                 size:    size,
572                                 offset:  off,
573                                 length:  len(data),
574                         }
575                         off += len(data)
576                 }
577                 return nil
578         }
579
580         names := make([]string, 0, len(dn.inodes))
581         for name := range dn.inodes {
582                 names = append(names, name)
583         }
584         sort.Strings(names)
585
586         for _, name := range names {
587                 fn, ok := dn.inodes[name].(*filenode)
588                 if !ok {
589                         continue
590                 }
591                 fn.Lock()
592                 defer fn.Unlock()
593                 for idx, ext := range fn.extents {
594                         ext, ok := ext.(*memExtent)
595                         if !ok {
596                                 continue
597                         }
598                         if ext.Len() > maxBlockSize/2 {
599                                 if err := flush([]shortBlock{{fn, idx}}); err != nil {
600                                         return err
601                                 }
602                                 continue
603                         }
604                         if pendingLen+ext.Len() > maxBlockSize {
605                                 if err := flush(pending); err != nil {
606                                         return err
607                                 }
608                                 pending = nil
609                                 pendingLen = 0
610                         }
611                         pending = append(pending, shortBlock{fn, idx})
612                         pendingLen += ext.Len()
613                 }
614         }
615         return flush(pending)
616 }
617
618 func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
619         dn.Lock()
620         defer dn.Unlock()
621         if err := dn.sync(); err != nil {
622                 return "", err
623         }
624
625         var streamLen int64
626         type m1segment struct {
627                 name   string
628                 offset int64
629                 length int64
630         }
631         var segments []m1segment
632         var subdirs string
633         var blocks []string
634
635         names := make([]string, 0, len(dn.inodes))
636         for name := range dn.inodes {
637                 names = append(names, name)
638         }
639         sort.Strings(names)
640
641         for _, name := range names {
642                 node := dn.inodes[name]
643                 switch node := node.(type) {
644                 case *dirnode:
645                         subdir, err := node.MarshalManifest(prefix + "/" + node.Name())
646                         if err != nil {
647                                 return "", err
648                         }
649                         subdirs = subdirs + subdir
650                 case *filenode:
651                         for _, e := range node.extents {
652                                 switch e := e.(type) {
653                                 case *memExtent:
654                                         blocks = append(blocks, fmt.Sprintf("FIXME+%d", e.Len()))
655                                         segments = append(segments, m1segment{
656                                                 name:   node.Name(),
657                                                 offset: streamLen,
658                                                 length: int64(e.Len()),
659                                         })
660                                         streamLen += int64(e.Len())
661                                 case storedExtent:
662                                         if len(blocks) > 0 && blocks[len(blocks)-1] == e.locator {
663                                                 streamLen -= int64(e.size)
664                                         } else {
665                                                 blocks = append(blocks, e.locator)
666                                         }
667                                         segments = append(segments, m1segment{
668                                                 name:   node.Name(),
669                                                 offset: streamLen + int64(e.offset),
670                                                 length: int64(e.length),
671                                         })
672                                         streamLen += int64(e.size)
673                                 default:
674                                         panic(fmt.Sprintf("can't marshal extent type %T", e))
675                                 }
676                         }
677                 default:
678                         panic(fmt.Sprintf("can't marshal inode type %T", node))
679                 }
680         }
681         var filetokens []string
682         for _, s := range segments {
683                 filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, s.name))
684         }
685         if len(filetokens) == 0 {
686                 return subdirs, nil
687         } else if len(blocks) == 0 {
688                 blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
689         }
690         return prefix + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
691 }
692
693 func (dn *dirnode) loadManifest(txt string) {
694         // FIXME: faster
695         var dirname string
696         for _, stream := range strings.Split(txt, "\n") {
697                 var extents []storedExtent
698                 for i, token := range strings.Split(stream, " ") {
699                         if i == 0 {
700                                 dirname = manifestUnescape(token)
701                                 continue
702                         }
703                         if !strings.Contains(token, ":") {
704                                 toks := strings.SplitN(token, "+", 3)
705                                 if len(toks) < 2 {
706                                         // FIXME: broken
707                                         continue
708                                 }
709                                 length, err := strconv.ParseInt(toks[1], 10, 32)
710                                 if err != nil || length < 0 {
711                                         // FIXME: broken
712                                         continue
713                                 }
714                                 extents = append(extents, storedExtent{
715                                         locator: token,
716                                         size:    int(length),
717                                         offset:  0,
718                                         length:  int(length),
719                                 })
720                                 continue
721                         }
722                         toks := strings.Split(token, ":")
723                         if len(toks) != 3 {
724                                 // FIXME: broken manifest
725                                 continue
726                         }
727                         offset, err := strconv.ParseInt(toks[0], 10, 64)
728                         if err != nil || offset < 0 {
729                                 // FIXME: broken manifest
730                                 continue
731                         }
732                         length, err := strconv.ParseInt(toks[1], 10, 64)
733                         if err != nil || length < 0 {
734                                 // FIXME: broken manifest
735                                 continue
736                         }
737                         name := path.Clean(dirname + "/" + manifestUnescape(toks[2]))
738                         dn.makeParentDirs(name)
739                         f, err := dn.OpenFile(name, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0700)
740                         if err != nil {
741                                 // FIXME: broken
742                                 continue
743                         }
744                         if f.inode.Stat().IsDir() {
745                                 f.Close()
746                                 // FIXME: broken manifest
747                                 continue
748                         }
749                         // Map the stream offset/range coordinates to
750                         // block/offset/range coordinates and add
751                         // corresponding storedExtents to the filenode
752                         var pos int64
753                         for _, e := range extents {
754                                 next := pos + int64(e.Len())
755                                 if next < offset {
756                                         pos = next
757                                         continue
758                                 }
759                                 if pos > offset+length {
760                                         break
761                                 }
762                                 var blkOff int
763                                 if pos < offset {
764                                         blkOff = int(offset - pos)
765                                 }
766                                 blkLen := e.Len() - blkOff
767                                 if pos+int64(blkOff+blkLen) > offset+length {
768                                         blkLen = int(offset + length - pos - int64(blkOff))
769                                 }
770                                 f.inode.(*filenode).appendExtent(storedExtent{
771                                         cache:   dn.cache,
772                                         locator: e.locator,
773                                         size:    e.size,
774                                         offset:  blkOff,
775                                         length:  blkLen,
776                                 })
777                                 pos = next
778                         }
779                         f.Close()
780                 }
781         }
782 }
783
784 func (dn *dirnode) makeParentDirs(name string) {
785         names := strings.Split(name, "/")
786         for _, name := range names[:len(names)-1] {
787                 dn.Lock()
788                 defer dn.Unlock()
789                 if n, ok := dn.inodes[name]; !ok {
790                         n := &dirnode{
791                                 parent: dn,
792                                 client: dn.client,
793                                 kc:     dn.kc,
794                                 fileinfo: fileinfo{
795                                         name: name,
796                                         mode: os.ModeDir | 0755,
797                                 },
798                         }
799                         if dn.inodes == nil {
800                                 dn.inodes = make(map[string]inode)
801                         }
802                         dn.inodes[name] = n
803                         dn.fileinfo.size++
804                         dn = n
805                 } else if n, ok := n.(*dirnode); ok {
806                         dn = n
807                 } else {
808                         // fail
809                         return
810                 }
811         }
812 }
813
814 func (dn *dirnode) Parent() inode {
815         return dn.parent
816 }
817
818 func (dn *dirnode) Readdir() (fi []os.FileInfo) {
819         dn.RLock()
820         defer dn.RUnlock()
821         fi = make([]os.FileInfo, 0, len(dn.inodes))
822         for _, inode := range dn.inodes {
823                 fi = append(fi, inode.Stat())
824         }
825         return
826 }
827
828 func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
829         return 0, ptr, ErrInvalidOperation
830 }
831
832 func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
833         return 0, ptr, ErrInvalidOperation
834 }
835
836 func (dn *dirnode) Truncate(int64) error {
837         return ErrInvalidOperation
838 }
839
840 func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*file, error) {
841         name = strings.TrimSuffix(name, "/")
842         if name == "." || name == "" {
843                 return &file{inode: dn}, nil
844         }
845         if dirname, name := path.Split(name); dirname != "" {
846                 // OpenFile("foo/bar/baz") =>
847                 // OpenFile("foo/bar").OpenFile("baz") (or
848                 // ErrNotExist, if foo/bar is a file)
849                 f, err := dn.OpenFile(dirname, os.O_RDONLY, 0)
850                 if err != nil {
851                         return nil, err
852                 }
853                 defer f.Close()
854                 if dn, ok := f.inode.(*dirnode); ok {
855                         return dn.OpenFile(name, flag, perm)
856                 } else {
857                         return nil, os.ErrNotExist
858                 }
859         }
860         dn.Lock()
861         defer dn.Unlock()
862         if name == ".." {
863                 return &file{inode: dn.parent}, nil
864         }
865         n, ok := dn.inodes[name]
866         if !ok {
867                 if flag&os.O_CREATE == 0 {
868                         return nil, os.ErrNotExist
869                 }
870                 n = &filenode{
871                         parent: dn,
872                         fileinfo: fileinfo{
873                                 name: name,
874                                 mode: 0755,
875                         },
876                 }
877                 if dn.inodes == nil {
878                         dn.inodes = make(map[string]inode)
879                 }
880                 dn.inodes[name] = n
881                 dn.fileinfo.size++
882         } else if flag&os.O_EXCL != 0 {
883                 return nil, ErrFileExists
884         }
885         return &file{
886                 inode:    n,
887                 append:   flag&os.O_APPEND != 0,
888                 writable: flag&(os.O_WRONLY|os.O_RDWR) != 0,
889         }, nil
890 }
891
892 type extent interface {
893         io.ReaderAt
894         Len() int
895         // Return a new extent with a subsection of the data from this
896         // one. length<0 means length=Len()-off.
897         Slice(off int, length int) extent
898 }
899
900 type writableExtent interface {
901         extent
902         WriteAt(p []byte, off int)
903         Truncate(n int)
904 }
905
906 type memExtent struct {
907         buf []byte
908 }
909
910 func (me *memExtent) Len() int {
911         return len(me.buf)
912 }
913
914 func (me *memExtent) Slice(off, length int) extent {
915         if length < 0 {
916                 length = len(me.buf) - off
917         }
918         buf := make([]byte, length)
919         copy(buf, me.buf[off:])
920         return &memExtent{buf: buf}
921 }
922
923 func (me *memExtent) Truncate(n int) {
924         if n > cap(me.buf) {
925                 newsize := 1024
926                 for newsize < n {
927                         newsize = newsize << 2
928                 }
929                 newbuf := make([]byte, n, newsize)
930                 copy(newbuf, me.buf)
931                 me.buf = newbuf
932         } else {
933                 // Zero unused part when shrinking, in case we grow
934                 // and start using it again later.
935                 for i := n; i < len(me.buf); i++ {
936                         me.buf[i] = 0
937                 }
938         }
939         me.buf = me.buf[:n]
940 }
941
942 func (me *memExtent) WriteAt(p []byte, off int) {
943         if off+len(p) > len(me.buf) {
944                 panic("overflowed extent")
945         }
946         copy(me.buf[off:], p)
947 }
948
949 func (me *memExtent) ReadAt(p []byte, off int64) (n int, err error) {
950         if off > int64(me.Len()) {
951                 err = io.EOF
952                 return
953         }
954         n = copy(p, me.buf[int(off):])
955         if n < len(p) {
956                 err = io.EOF
957         }
958         return
959 }
960
961 type storedExtent struct {
962         cache   blockCache
963         locator string
964         size    int
965         offset  int
966         length  int
967 }
968
969 func (se storedExtent) Len() int {
970         return se.length
971 }
972
973 func (se storedExtent) Slice(n, size int) extent {
974         se.offset += n
975         se.length -= n
976         if size >= 0 && se.length > size {
977                 se.length = size
978         }
979         return se
980 }
981
982 func (se storedExtent) ReadAt(p []byte, off int64) (n int, err error) {
983         if off > int64(se.length) {
984                 return 0, io.EOF
985         }
986         maxlen := se.length - int(off)
987         if len(p) > maxlen {
988                 p = p[:maxlen]
989                 n, err = se.cache.ReadAt(se.locator, p, int(off)+se.offset)
990                 if err == nil {
991                         err = io.EOF
992                 }
993                 return
994         }
995         return se.cache.ReadAt(se.locator, p, int(off)+se.offset)
996 }
997
998 type blockCache interface {
999         ReadAt(locator string, p []byte, off int) (n int, err error)
1000 }
1001
1002 type keepBlockCache struct {
1003         kc keepClient
1004 }
1005
1006 var scratch = make([]byte, 2<<26)
1007
1008 func (kbc *keepBlockCache) ReadAt(locator string, p []byte, off int) (int, error) {
1009         return kbc.kc.ReadAt(locator, p, off)
1010 }
1011
1012 func canonicalName(name string) string {
1013         name = path.Clean("/" + name)
1014         if name == "/" || name == "./" {
1015                 name = "."
1016         } else if strings.HasPrefix(name, "/") {
1017                 name = "." + name
1018         }
1019         return name
1020 }
1021
1022 var manifestEscapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
1023
1024 func manifestUnescapeSeq(seq string) string {
1025         if seq == `\\` {
1026                 return `\`
1027         }
1028         i, err := strconv.ParseUint(seq[1:], 8, 8)
1029         if err != nil {
1030                 // Invalid escape sequence: can't unescape.
1031                 return seq
1032         }
1033         return string([]byte{byte(i)})
1034 }
1035
1036 func manifestUnescape(s string) string {
1037         return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeSeq)
1038 }