Merge branch 'master' into 15577-ownership-transfer
[arvados.git] / sdk / go / arvados / fs_collection.go
index 72ef49feec8a3a6e3fa0d85c67c9b81d2763d621..b3e6aa96e46473a11a6a839aa1a3e35693b25cdb 100644 (file)
@@ -5,11 +5,10 @@
 package arvados
 
 import (
+       "context"
        "encoding/json"
-       "errors"
        "fmt"
        "io"
-       "net/http"
        "os"
        "path"
        "regexp"
@@ -21,110 +20,10 @@ import (
 )
 
 var (
-       ErrReadOnlyFile      = errors.New("read-only file")
-       ErrNegativeOffset    = errors.New("cannot seek to negative offset")
-       ErrFileExists        = errors.New("file exists")
-       ErrInvalidOperation  = errors.New("invalid operation")
-       ErrInvalidArgument   = errors.New("invalid argument")
-       ErrDirectoryNotEmpty = errors.New("directory not empty")
-       ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
-       ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
-       ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
-       ErrPermission        = os.ErrPermission
-
-       maxBlockSize = 1 << 26
+       maxBlockSize      = 1 << 26
+       concurrentWriters = 4 // max goroutines writing to Keep in background and during flush()
 )
 
-// A File is an *os.File-like interface for reading and writing files
-// in a CollectionFileSystem.
-type File interface {
-       io.Reader
-       io.Writer
-       io.Closer
-       io.Seeker
-       Size() int64
-       Readdir(int) ([]os.FileInfo, error)
-       Stat() (os.FileInfo, error)
-       Truncate(int64) error
-}
-
-type keepClient interface {
-       ReadAt(locator string, p []byte, off int) (int, error)
-       PutB(p []byte) (string, int, error)
-}
-
-type fileinfo struct {
-       name    string
-       mode    os.FileMode
-       size    int64
-       modTime time.Time
-}
-
-// Name implements os.FileInfo.
-func (fi fileinfo) Name() string {
-       return fi.name
-}
-
-// ModTime implements os.FileInfo.
-func (fi fileinfo) ModTime() time.Time {
-       return fi.modTime
-}
-
-// Mode implements os.FileInfo.
-func (fi fileinfo) Mode() os.FileMode {
-       return fi.mode
-}
-
-// IsDir implements os.FileInfo.
-func (fi fileinfo) IsDir() bool {
-       return fi.mode&os.ModeDir != 0
-}
-
-// Size implements os.FileInfo.
-func (fi fileinfo) Size() int64 {
-       return fi.size
-}
-
-// Sys implements os.FileInfo.
-func (fi fileinfo) Sys() interface{} {
-       return nil
-}
-
-// A FileSystem is an http.Filesystem plus Stat() and support for
-// opening writable files. All methods are safe to call from multiple
-// goroutines.
-type FileSystem interface {
-       http.FileSystem
-
-       inode
-
-       // analogous to os.Stat()
-       Stat(name string) (os.FileInfo, error)
-
-       // analogous to os.Create(): create/truncate a file and open it O_RDWR.
-       Create(name string) (File, error)
-
-       // Like os.OpenFile(): create or open a file or directory.
-       //
-       // If flag&os.O_EXCL==0, it opens an existing file or
-       // directory if one exists. If flag&os.O_CREATE!=0, it creates
-       // a new empty file or directory if one does not already
-       // exist.
-       //
-       // When creating a new item, perm&os.ModeDir determines
-       // whether it is a file or a directory.
-       //
-       // A file can be opened multiple times and used concurrently
-       // from multiple goroutines. However, each File object should
-       // be used by only one goroutine at a time.
-       OpenFile(name string, flag int, perm os.FileMode) (File, error)
-
-       Mkdir(name string, perm os.FileMode) error
-       Remove(name string) error
-       RemoveAll(name string) error
-       Rename(oldname, newname string) error
-}
-
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
 type CollectionFileSystem interface {
@@ -135,334 +34,164 @@ type CollectionFileSystem interface {
        // Prefix (normally ".") is a top level directory, effectively
        // prepended to all paths in the returned manifest.
        MarshalManifest(prefix string) (string, error)
-}
 
-type fileSystem struct {
-       inode
+       // Total data bytes in all files.
+       Size() int64
+
+       // Memory consumed by buffered file data.
+       memorySize() int64
 }
 
 type collectionFileSystem struct {
        fileSystem
+       uuid string
 }
 
-func (fs collectionFileSystem) Child(name string, replace func(inode) inode) inode {
-       if name == ".arvados#collection" {
-               return &getternode{Getter: func() ([]byte, error) {
-                       var coll Collection
-                       var err error
-                       coll.ManifestText, err = fs.MarshalManifest(".")
-                       if err != nil {
-                               return nil, err
-                       }
-                       data, err := json.Marshal(&coll)
-                       if err == nil {
-                               data = append(data, 10)
-                       }
-                       return data, err
-               }}
-       }
-       return fs.fileSystem.Child(name, replace)
-}
-
-func (fs collectionFileSystem) MarshalManifest(prefix string) (string, error) {
-       fs.fileSystem.inode.Lock()
-       defer fs.fileSystem.inode.Unlock()
-       return fs.fileSystem.inode.(*dirnode).marshalManifest(prefix)
-}
-
-// OpenFile is analogous to os.OpenFile().
-func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
-       return fs.openFile(name, flag, perm)
-}
-
-func (fs *fileSystem) openFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
-       var dn inode = fs.inode
-       if flag&os.O_SYNC != 0 {
-               return nil, ErrSyncNotSupported
-       }
-       dirname, name := path.Split(name)
-       parent := rlookup(dn, dirname)
-       if parent == nil {
-               return nil, os.ErrNotExist
-       }
-       var readable, writable bool
-       switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
-       case os.O_RDWR:
-               readable = true
-               writable = true
-       case os.O_RDONLY:
-               readable = true
-       case os.O_WRONLY:
-               writable = true
-       default:
-               return nil, fmt.Errorf("invalid flags 0x%x", flag)
+// FileSystem returns a CollectionFileSystem for the collection.
+func (c *Collection) FileSystem(client apiClient, kc keepClient) (CollectionFileSystem, error) {
+       var modTime time.Time
+       if c.ModifiedAt == nil {
+               modTime = time.Now()
+       } else {
+               modTime = *c.ModifiedAt
        }
-       if !writable && parent.IsDir() {
-               // A directory can be opened via "foo/", "foo/.", or
-               // "foo/..".
-               switch name {
-               case ".", "":
-                       return &filehandle{inode: parent}, nil
-               case "..":
-                       return &filehandle{inode: parent.Parent()}, nil
-               }
+       fs := &collectionFileSystem{
+               uuid: c.UUID,
+               fileSystem: fileSystem{
+                       fsBackend: keepBackend{apiClient: client, keepClient: kc},
+                       thr:       newThrottle(concurrentWriters),
+               },
        }
-       createMode := flag&os.O_CREATE != 0
-       if createMode {
-               parent.Lock()
-               defer parent.Unlock()
-       } else {
-               parent.RLock()
-               defer parent.RUnlock()
+       root := &dirnode{
+               fs: fs,
+               treenode: treenode{
+                       fileinfo: fileinfo{
+                               name:    ".",
+                               mode:    os.ModeDir | 0755,
+                               modTime: modTime,
+                       },
+                       inodes: make(map[string]inode),
+               },
        }
-       n := parent.Child(name, nil)
-       if n == nil {
-               if !createMode {
-                       return nil, os.ErrNotExist
-               }
-               var err error
-               n = parent.Child(name, func(inode) inode {
-                       var dn *dirnode
-                       switch parent := parent.(type) {
-                       case *dirnode:
-                               dn = parent
-                       case *collectionFileSystem:
-                               dn = parent.inode.(*dirnode)
-                       default:
-                               err = ErrInvalidArgument
-                               return nil
-                       }
-                       if perm.IsDir() {
-                               n, err = dn.newDirnode(dn, name, perm|0755, time.Now())
-                       } else {
-                               n, err = dn.newFilenode(dn, name, perm|0755, time.Now())
-                       }
-                       return n
-               })
-               if err != nil {
-                       return nil, err
-               } else if n == nil {
-                       // parent rejected new child
-                       return nil, ErrInvalidOperation
-               }
-       } else if flag&os.O_EXCL != 0 {
-               return nil, ErrFileExists
-       } else if flag&os.O_TRUNC != 0 {
-               if !writable {
-                       return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
-               } else if fn, ok := n.(*filenode); !ok {
-                       return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
-               } else {
-                       fn.Truncate(0)
-               }
+       root.SetParent(root, ".")
+       if err := root.loadManifest(c.ManifestText); err != nil {
+               return nil, err
        }
-       return &filehandle{
-               inode:    n,
-               append:   flag&os.O_APPEND != 0,
-               readable: readable,
-               writable: writable,
-       }, nil
-}
-
-func (fs *fileSystem) Open(name string) (http.File, error) {
-       return fs.OpenFile(name, os.O_RDONLY, 0)
-}
-
-func (fs *fileSystem) Create(name string) (File, error) {
-       return fs.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
+       backdateTree(root, modTime)
+       fs.root = root
+       return fs, nil
 }
 
-func (fs *fileSystem) Mkdir(name string, perm os.FileMode) (err error) {
-       dirname, name := path.Split(name)
-       n := rlookup(fs.inode, dirname)
-       if n == nil {
-               return os.ErrNotExist
-       }
-       n.Lock()
-       defer n.Unlock()
-       if n.Child(name, nil) != nil {
-               return os.ErrExist
-       }
-       dn, ok := n.(*dirnode)
-       if !ok {
-               return ErrInvalidArgument
-       }
-       child := n.Child(name, func(inode) (child inode) {
-               child, err = dn.newDirnode(dn, name, perm, time.Now())
-               return
-       })
-       if err != nil {
-               return err
-       } else if child == nil {
-               return ErrInvalidArgument
+func backdateTree(n inode, modTime time.Time) {
+       switch n := n.(type) {
+       case *filenode:
+               n.fileinfo.modTime = modTime
+       case *dirnode:
+               n.fileinfo.modTime = modTime
+               for _, n := range n.inodes {
+                       backdateTree(n, modTime)
+               }
        }
-       return nil
 }
 
-func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
-       node := rlookup(fs.inode, name)
-       if node == nil {
-               err = os.ErrNotExist
+func (fs *collectionFileSystem) newNode(name string, perm os.FileMode, modTime time.Time) (node inode, err error) {
+       if name == "" || name == "." || name == ".." {
+               return nil, ErrInvalidArgument
+       }
+       if perm.IsDir() {
+               return &dirnode{
+                       fs: fs,
+                       treenode: treenode{
+                               fileinfo: fileinfo{
+                                       name:    name,
+                                       mode:    perm | os.ModeDir,
+                                       modTime: modTime,
+                               },
+                               inodes: make(map[string]inode),
+                       },
+               }, nil
        } else {
-               fi = node.FileInfo()
+               return &filenode{
+                       fs: fs,
+                       fileinfo: fileinfo{
+                               name:    name,
+                               mode:    perm & ^os.ModeDir,
+                               modTime: modTime,
+                       },
+               }, nil
        }
-       return
 }
 
-func (fs *fileSystem) Rename(oldname, newname string) error {
-       olddir, oldname := path.Split(oldname)
-       if oldname == "" || oldname == "." || oldname == ".." {
-               return ErrInvalidArgument
+func (fs *collectionFileSystem) Sync() error {
+       if fs.uuid == "" {
+               return nil
        }
-       olddirf, err := fs.openFile(olddir+".", os.O_RDONLY, 0)
+       txt, err := fs.MarshalManifest(".")
        if err != nil {
-               return fmt.Errorf("%q: %s", olddir, err)
+               return fmt.Errorf("sync failed: %s", err)
        }
-       defer olddirf.Close()
-
-       newdir, newname := path.Split(newname)
-       if newname == "." || newname == ".." {
-               return ErrInvalidArgument
-       } else if newname == "" {
-               // Rename("a/b", "c/") means Rename("a/b", "c/b")
-               newname = oldname
+       coll := &Collection{
+               UUID:         fs.uuid,
+               ManifestText: txt,
        }
-       newdirf, err := fs.openFile(newdir+".", os.O_RDONLY, 0)
+       err = fs.RequestAndDecode(nil, "PUT", "arvados/v1/collections/"+fs.uuid, nil, map[string]interface{}{
+               "collection": map[string]string{
+                       "manifest_text": coll.ManifestText,
+               },
+               "select": []string{"uuid"},
+       })
        if err != nil {
-               return fmt.Errorf("%q: %s", newdir, err)
-       }
-       defer newdirf.Close()
-
-       // When acquiring locks on multiple nodes, all common
-       // ancestors must be locked first in order to avoid
-       // deadlock. This is assured by locking the path from root to
-       // newdir, then locking the path from root to olddir, skipping
-       // any already-locked nodes.
-       needLock := []sync.Locker{}
-       for _, f := range []*filehandle{olddirf, newdirf} {
-               node := f.inode
-               needLock = append(needLock, node)
-               for node.Parent() != node {
-                       node = node.Parent()
-                       needLock = append(needLock, node)
-               }
-       }
-       locked := map[sync.Locker]bool{}
-       for i := len(needLock) - 1; i >= 0; i-- {
-               if n := needLock[i]; !locked[n] {
-                       n.Lock()
-                       defer n.Unlock()
-                       locked[n] = true
-               }
+               return fmt.Errorf("sync failed: update %s: %s", fs.uuid, err)
        }
+       return nil
+}
 
-       if _, ok := newdirf.inode.(*dirnode); !ok {
-               return ErrInvalidOperation
+func (fs *collectionFileSystem) Flush(path string, shortBlocks bool) error {
+       node, err := rlookup(fs.fileSystem.root, path)
+       if err != nil {
+               return err
        }
-
-       err = nil
-       olddirf.inode.Child(oldname, func(oldinode inode) inode {
-               if oldinode == nil {
-                       err = os.ErrNotExist
-                       return nil
-               }
-               newdirf.inode.Child(newname, func(existing inode) inode {
-                       if existing != nil && existing.IsDir() {
-                               err = ErrIsDirectory
-                               return existing
+       dn, ok := node.(*dirnode)
+       if !ok {
+               return ErrNotADirectory
+       }
+       dn.Lock()
+       defer dn.Unlock()
+       names := dn.sortedNames()
+       if path != "" {
+               // Caller only wants to flush the specified dir,
+               // non-recursively.  Drop subdirs from the list of
+               // names.
+               var filenames []string
+               for _, name := range names {
+                       if _, ok := dn.inodes[name].(*filenode); ok {
+                               filenames = append(filenames, name)
                        }
-                       return oldinode
-               })
-               if err != nil {
-                       return oldinode
-               }
-               oldinode.Lock()
-               defer oldinode.Unlock()
-               olddn := olddirf.inode.(*dirnode)
-               newdn := newdirf.inode.(*dirnode)
-               switch n := oldinode.(type) {
-               case *dirnode:
-                       n.parent = newdirf.inode
-                       n.treenode.fileinfo.name = newname
-               case *filenode:
-                       n.parent = newdn
-                       n.fileinfo.name = newname
-               default:
-                       panic(fmt.Sprintf("bad inode type %T", n))
                }
-               olddn.treenode.fileinfo.modTime = time.Now()
-               newdn.treenode.fileinfo.modTime = time.Now()
-               return nil
-       })
-       return err
-}
-
-func (fs *fileSystem) Remove(name string) error {
-       return fs.remove(strings.TrimRight(name, "/"), false)
-}
-
-func (fs *fileSystem) RemoveAll(name string) error {
-       err := fs.remove(strings.TrimRight(name, "/"), true)
-       if os.IsNotExist(err) {
-               // "If the path does not exist, RemoveAll returns
-               // nil." (see "os" pkg)
-               err = nil
+               names = filenames
+       }
+       for _, name := range names {
+               child := dn.inodes[name]
+               child.Lock()
+               defer child.Unlock()
        }
-       return err
+       return dn.flush(context.TODO(), names, flushOpts{sync: false, shortBlocks: shortBlocks})
 }
 
-func (fs *fileSystem) remove(name string, recursive bool) (err error) {
-       dirname, name := path.Split(name)
-       if name == "" || name == "." || name == ".." {
-               return ErrInvalidArgument
-       }
-       dir := rlookup(fs, dirname)
-       if dir == nil {
-               return os.ErrNotExist
-       }
-       dir.Lock()
-       defer dir.Unlock()
-       dir.Child(name, func(node inode) inode {
-               if node == nil {
-                       err = os.ErrNotExist
-                       return nil
-               }
-               if !recursive && node.IsDir() && node.Size() > 0 {
-                       err = ErrDirectoryNotEmpty
-                       return node
-               }
-               return nil
-       })
-       return err
+func (fs *collectionFileSystem) memorySize() int64 {
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       return fs.fileSystem.root.(*dirnode).memorySize()
 }
 
-type inode interface {
-       Parent() inode
-       Read([]byte, filenodePtr) (int, filenodePtr, error)
-       Write([]byte, filenodePtr) (int, filenodePtr, error)
-       Truncate(int64) error
-       IsDir() bool
-       Readdir() []os.FileInfo
-       Size() int64
-       FileInfo() os.FileInfo
-       // Caller must have lock (or rlock if func is nil)
-       Child(string, func(inode) inode) inode
-       sync.Locker
-       RLock()
-       RUnlock()
+func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
+       fs.fileSystem.root.Lock()
+       defer fs.fileSystem.root.Unlock()
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix)
 }
 
-// filenode implements inode.
-type filenode struct {
-       fileinfo fileinfo
-       parent   *dirnode
-       segments []segment
-       // number of times `segments` has changed in a
-       // way that might invalidate a filenodePtr
-       repacked int64
-       memsize  int64 // bytes in memSegments
-       sync.RWMutex
-       nullnode
+func (fs *collectionFileSystem) Size() int64 {
+       return fs.fileSystem.root.(*dirnode).TreeSize()
 }
 
 // filenodePtr is an offset into a file that is (usually) efficient to
@@ -538,18 +267,43 @@ func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
        return
 }
 
+// filenode implements inode.
+type filenode struct {
+       parent   inode
+       fs       FileSystem
+       fileinfo fileinfo
+       segments []segment
+       // number of times `segments` has changed in a
+       // way that might invalidate a filenodePtr
+       repacked int64
+       memsize  int64 // bytes in memSegments
+       sync.RWMutex
+       nullnode
+}
+
 // caller must have lock
 func (fn *filenode) appendSegment(e segment) {
        fn.segments = append(fn.segments, e)
        fn.fileinfo.size += int64(e.Len())
 }
 
+func (fn *filenode) SetParent(p inode, name string) {
+       fn.Lock()
+       defer fn.Unlock()
+       fn.parent = p
+       fn.fileinfo.name = name
+}
+
 func (fn *filenode) Parent() inode {
        fn.RLock()
        defer fn.RUnlock()
        return fn.parent
 }
 
+func (fn *filenode) FS() FileSystem {
+       return fn.fs
+}
+
 // Read reads file data from a single segment, starting at startPtr,
 // into p. startPtr is assumed not to be up-to-date. Caller must have
 // RLock or Lock.
@@ -787,272 +541,407 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: async (don't hold Lock() while waiting for Keep)
-       // TODO: share code with (*dirnode)sync()
+       // TODO: share code with (*dirnode)flush()
        // TODO: pack/flush small blocks too, when fragmented
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
-               if !ok || seg.Len() < maxBlockSize {
+               if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
                        continue
                }
-               locator, _, err := fn.parent.kc.PutB(seg.buf)
-               if err != nil {
-                       // TODO: stall (or return errors from)
-                       // subsequent writes until flushing
-                       // starts to succeed
-                       continue
-               }
-               fn.memsize -= int64(seg.Len())
-               fn.segments[idx] = storedSegment{
-                       kc:      fn.parent.kc,
-                       locator: locator,
-                       size:    seg.Len(),
-                       offset:  0,
-                       length:  seg.Len(),
-               }
+               // Setting seg.flushing guarantees seg.buf will not be
+               // modified in place: WriteAt and Truncate will
+               // allocate a new buf instead, if necessary.
+               idx, buf := idx, seg.buf
+               done := make(chan struct{})
+               seg.flushing = done
+               // If lots of background writes are already in
+               // progress, block here until one finishes, rather
+               // than pile up an unlimited number of buffered writes
+               // and network flush operations.
+               fn.fs.throttle().Acquire()
+               go func() {
+                       defer close(done)
+                       locator, _, err := fn.FS().PutB(buf)
+                       fn.fs.throttle().Release()
+                       fn.Lock()
+                       defer fn.Unlock()
+                       if seg.flushing != done {
+                               // A new seg.buf has been allocated.
+                               return
+                       }
+                       seg.flushing = nil
+                       if err != nil {
+                               // TODO: stall (or return errors from)
+                               // subsequent writes until flushing
+                               // starts to succeed.
+                               return
+                       }
+                       if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
+                               // Segment has been dropped/moved/resized.
+                               return
+                       }
+                       fn.memsize -= int64(len(buf))
+                       fn.segments[idx] = storedSegment{
+                               kc:      fn.FS(),
+                               locator: locator,
+                               size:    len(buf),
+                               offset:  0,
+                               length:  len(buf),
+                       }
+               }()
        }
 }
 
-// FileSystem returns a CollectionFileSystem for the collection.
-func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
-       var modTime time.Time
-       if c.ModifiedAt == nil {
-               modTime = time.Now()
-       } else {
-               modTime = *c.ModifiedAt
-       }
-       dn := &dirnode{
-               client: client,
-               kc:     kc,
-               treenode: treenode{
-                       fileinfo: fileinfo{
-                               name:    ".",
-                               mode:    os.ModeDir | 0755,
-                               modTime: modTime,
-                       },
-                       parent: nil,
-                       inodes: make(map[string]inode),
-               },
+// Block until all pending pruneMemSegments/flush work is
+// finished. Caller must NOT have lock.
+func (fn *filenode) waitPrune() {
+       var pending []<-chan struct{}
+       fn.Lock()
+       for _, seg := range fn.segments {
+               if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
+                       pending = append(pending, seg.flushing)
+               }
        }
-       dn.parent = dn
-       fs := &collectionFileSystem{fileSystem: fileSystem{inode: dn}}
-       if err := dn.loadManifest(c.ManifestText); err != nil {
-               return nil, err
+       fn.Unlock()
+       for _, p := range pending {
+               <-p
        }
-       return fs, nil
 }
 
-type filehandle struct {
-       inode
-       ptr        filenodePtr
-       append     bool
-       readable   bool
-       writable   bool
-       unreaddirs []os.FileInfo
+type dirnode struct {
+       fs *collectionFileSystem
+       treenode
 }
 
-func (f *filehandle) Read(p []byte) (n int, err error) {
-       if !f.readable {
-               return 0, ErrWriteOnlyMode
-       }
-       f.inode.RLock()
-       defer f.inode.RUnlock()
-       n, f.ptr, err = f.inode.Read(p, f.ptr)
-       return
+func (dn *dirnode) FS() FileSystem {
+       return dn.fs
 }
 
-func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
-       size := f.inode.Size()
-       ptr := f.ptr
-       switch whence {
-       case io.SeekStart:
-               ptr.off = off
-       case io.SeekCurrent:
-               ptr.off += off
-       case io.SeekEnd:
-               ptr.off = size + off
-       }
-       if ptr.off < 0 {
-               return f.ptr.off, ErrNegativeOffset
-       }
-       if ptr.off != f.ptr.off {
-               f.ptr = ptr
-               // force filenode to recompute f.ptr fields on next
-               // use
-               f.ptr.repacked = -1
+func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode, error) {
+       if dn == dn.fs.rootnode() && name == ".arvados#collection" {
+               gn := &getternode{Getter: func() ([]byte, error) {
+                       var coll Collection
+                       var err error
+                       coll.ManifestText, err = dn.fs.MarshalManifest(".")
+                       if err != nil {
+                               return nil, err
+                       }
+                       data, err := json.Marshal(&coll)
+                       if err == nil {
+                               data = append(data, '\n')
+                       }
+                       return data, err
+               }}
+               gn.SetParent(dn, name)
+               return gn, nil
        }
-       return f.ptr.off, nil
-}
-
-func (f *filehandle) Truncate(size int64) error {
-       return f.inode.Truncate(size)
+       return dn.treenode.Child(name, replace)
 }
 
-func (f *filehandle) Write(p []byte) (n int, err error) {
-       if !f.writable {
-               return 0, ErrReadOnlyFile
-       }
-       f.inode.Lock()
-       defer f.inode.Unlock()
-       if fn, ok := f.inode.(*filenode); ok && f.append {
-               f.ptr = filenodePtr{
-                       off:        fn.fileinfo.size,
-                       segmentIdx: len(fn.segments),
-                       segmentOff: 0,
-                       repacked:   fn.repacked,
-               }
-       }
-       n, f.ptr, err = f.inode.Write(p, f.ptr)
-       return
+type fnSegmentRef struct {
+       fn  *filenode
+       idx int
 }
 
-func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
-       if !f.inode.IsDir() {
-               return nil, ErrInvalidOperation
-       }
-       if count <= 0 {
-               return f.inode.Readdir(), nil
-       }
-       if f.unreaddirs == nil {
-               f.unreaddirs = f.inode.Readdir()
-       }
-       if len(f.unreaddirs) == 0 {
-               return nil, io.EOF
-       }
-       if count > len(f.unreaddirs) {
-               count = len(f.unreaddirs)
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// bufsize is the total data size in refs. It is used to preallocate
+// the correct amount of memory when len(refs)>1.
+//
+// If sync is false, commitBlock returns right away, after starting a
+// goroutine to do the writes, reacquire the filenodes' locks, and
+// swap out the *memSegments. Some filenodes' segments might get
+// modified/rearranged in the meantime, in which case commitBlock
+// won't replace them.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, refs []fnSegmentRef, bufsize int, sync bool) error {
+       if len(refs) == 0 {
+               return nil
        }
-       ret := f.unreaddirs[:count]
-       f.unreaddirs = f.unreaddirs[count:]
-       return ret, nil
-}
-
-func (f *filehandle) Stat() (os.FileInfo, error) {
-       return f.inode.FileInfo(), nil
-}
-
-func (f *filehandle) Close() error {
-       return nil
-}
-
-type dirnode struct {
-       treenode
-       client *Client
-       kc     keepClient
-}
-
-// sync flushes in-memory data (for all files in the tree rooted at
-// dn) to persistent storage. Caller must hold dn.Lock().
-func (dn *dirnode) sync() error {
-       type shortBlock struct {
-               fn  *filenode
-               idx int
+       if err := ctx.Err(); err != nil {
+               return err
        }
-       var pending []shortBlock
-       var pendingLen int
-
-       flush := func(sbs []shortBlock) error {
-               if len(sbs) == 0 {
+       done := make(chan struct{})
+       var block []byte
+       segs := make([]*memSegment, 0, len(refs))
+       offsets := make([]int, 0, len(refs)) // location of segment's data within block
+       for _, ref := range refs {
+               seg := ref.fn.segments[ref.idx].(*memSegment)
+               if seg.flushing != nil && !sync {
+                       // Let the other flushing goroutine finish. If
+                       // it fails, we'll try again next time.
                        return nil
+               } else {
+                       // In sync mode, we proceed regardless of
+                       // whether another flush is in progress: It
+                       // can't finish before we do, because we hold
+                       // fn's lock until we finish our own writes.
+               }
+               seg.flushing = done
+               offsets = append(offsets, len(block))
+               if len(refs) == 1 {
+                       block = seg.buf
+               } else if block == nil {
+                       block = append(make([]byte, 0, bufsize), seg.buf...)
+               } else {
+                       block = append(block, seg.buf...)
                }
-               block := make([]byte, 0, maxBlockSize)
-               for _, sb := range sbs {
-                       block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
+               segs = append(segs, seg)
+       }
+       dn.fs.throttle().Acquire()
+       errs := make(chan error, 1)
+       go func() {
+               defer close(done)
+               defer close(errs)
+               locked := map[*filenode]bool{}
+               locator, _, err := dn.fs.PutB(block)
+               dn.fs.throttle().Release()
+               {
+                       if !sync {
+                               for _, name := range dn.sortedNames() {
+                                       if fn, ok := dn.inodes[name].(*filenode); ok {
+                                               fn.Lock()
+                                               defer fn.Unlock()
+                                               locked[fn] = true
+                                       }
+                               }
+                       }
+                       defer func() {
+                               for _, seg := range segs {
+                                       if seg.flushing == done {
+                                               seg.flushing = nil
+                                       }
+                               }
+                       }()
                }
-               locator, _, err := dn.kc.PutB(block)
                if err != nil {
-                       return err
+                       errs <- err
+                       return
                }
-               off := 0
-               for _, sb := range sbs {
-                       data := sb.fn.segments[sb.idx].(*memSegment).buf
-                       sb.fn.segments[sb.idx] = storedSegment{
-                               kc:      dn.kc,
+               for idx, ref := range refs {
+                       if !sync {
+                               // In async mode, fn's lock was
+                               // released while we were waiting for
+                               // PutB(); lots of things might have
+                               // changed.
+                               if len(ref.fn.segments) <= ref.idx {
+                                       // file segments have
+                                       // rearranged or changed in
+                                       // some way
+                                       continue
+                               } else if seg, ok := ref.fn.segments[ref.idx].(*memSegment); !ok || seg != segs[idx] {
+                                       // segment has been replaced
+                                       continue
+                               } else if seg.flushing != done {
+                                       // seg.buf has been replaced
+                                       continue
+                               } else if !locked[ref.fn] {
+                                       // file was renamed, moved, or
+                                       // deleted since we called
+                                       // PutB
+                                       continue
+                               }
+                       }
+                       data := ref.fn.segments[ref.idx].(*memSegment).buf
+                       ref.fn.segments[ref.idx] = storedSegment{
+                               kc:      dn.fs,
                                locator: locator,
                                size:    len(block),
-                               offset:  off,
+                               offset:  offsets[idx],
                                length:  len(data),
                        }
-                       off += len(data)
-                       sb.fn.memsize -= int64(len(data))
+                       ref.fn.memsize -= int64(len(data))
                }
+       }()
+       if sync {
+               return <-errs
+       } else {
                return nil
        }
+}
 
-       names := make([]string, 0, len(dn.inodes))
-       for name := range dn.inodes {
-               names = append(names, name)
+type flushOpts struct {
+       sync        bool
+       shortBlocks bool
+}
+
+// flush in-memory data and remote-cluster block references (for the
+// children with the given names, which must be children of dn) to
+// local-cluster persistent storage.
+//
+// Caller must have write lock on dn and the named children.
+//
+// If any children are dirs, they will be flushed recursively.
+func (dn *dirnode) flush(ctx context.Context, names []string, opts flushOpts) error {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
+
+       goCommit := func(refs []fnSegmentRef, bufsize int) {
+               cg.Go(func() error {
+                       return dn.commitBlock(cg.Context(), refs, bufsize, opts.sync)
+               })
        }
-       sort.Strings(names)
 
+       var pending []fnSegmentRef
+       var pendingLen int = 0
+       localLocator := map[string]string{}
        for _, name := range names {
-               fn, ok := dn.inodes[name].(*filenode)
-               if !ok {
-                       continue
-               }
-               fn.Lock()
-               defer fn.Unlock()
-               for idx, seg := range fn.segments {
-                       seg, ok := seg.(*memSegment)
-                       if !ok {
-                               continue
+               switch node := dn.inodes[name].(type) {
+               case *dirnode:
+                       grandchildNames := node.sortedNames()
+                       for _, grandchildName := range grandchildNames {
+                               grandchild := node.inodes[grandchildName]
+                               grandchild.Lock()
+                               defer grandchild.Unlock()
                        }
-                       if seg.Len() > maxBlockSize/2 {
-                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
-                                       return err
+                       cg.Go(func() error { return node.flush(cg.Context(), grandchildNames, opts) })
+               case *filenode:
+                       for idx, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case storedSegment:
+                                       loc, ok := localLocator[seg.locator]
+                                       if !ok {
+                                               var err error
+                                               loc, err = dn.fs.LocalLocator(seg.locator)
+                                               if err != nil {
+                                                       return err
+                                               }
+                                               localLocator[seg.locator] = loc
+                                       }
+                                       seg.locator = loc
+                                       node.segments[idx] = seg
+                               case *memSegment:
+                                       if seg.Len() > maxBlockSize/2 {
+                                               goCommit([]fnSegmentRef{{node, idx}}, seg.Len())
+                                               continue
+                                       }
+                                       if pendingLen+seg.Len() > maxBlockSize {
+                                               goCommit(pending, pendingLen)
+                                               pending = nil
+                                               pendingLen = 0
+                                       }
+                                       pending = append(pending, fnSegmentRef{node, idx})
+                                       pendingLen += seg.Len()
+                               default:
+                                       panic(fmt.Sprintf("can't sync segment type %T", seg))
                                }
-                               continue
                        }
-                       if pendingLen+seg.Len() > maxBlockSize {
-                               if err := flush(pending); err != nil {
-                                       return err
+               }
+       }
+       if opts.shortBlocks {
+               goCommit(pending, pendingLen)
+       }
+       return cg.Wait()
+}
+
+// caller must have write lock.
+func (dn *dirnode) memorySize() (size int64) {
+       for _, name := range dn.sortedNames() {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
+               case *dirnode:
+                       size += node.memorySize()
+               case *filenode:
+                       for _, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case *memSegment:
+                                       size += int64(seg.Len())
                                }
-                               pending = nil
-                               pendingLen = 0
                        }
-                       pending = append(pending, shortBlock{fn, idx})
-                       pendingLen += seg.Len()
                }
        }
-       return flush(pending)
+       return
 }
 
-// caller must have read lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
-       var streamLen int64
-       type filepart struct {
-               name   string
-               offset int64
-               length int64
+// caller must have write lock.
+func (dn *dirnode) sortedNames() []string {
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
        }
-       var fileparts []filepart
-       var subdirs string
-       var blocks []string
+       sort.Strings(names)
+       return names
+}
 
-       if err := dn.sync(); err != nil {
-               return "", err
+// caller must have write lock.
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string) (string, error) {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
+
+       if len(dn.inodes) == 0 {
+               if prefix == "." {
+                       return "", nil
+               }
+               // Express the existence of an empty directory by
+               // adding an empty file named `\056`, which (unlike
+               // the more obvious spelling `.`) is accepted by the
+               // API's manifest validator.
+               return manifestEscape(prefix) + " d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\056\n", nil
        }
 
-       names := make([]string, 0, len(dn.inodes))
-       for name, node := range dn.inodes {
-               names = append(names, name)
-               node.Lock()
-               defer node.Unlock()
+       names := dn.sortedNames()
+
+       // Wait for children to finish any pending write operations
+       // before locking them.
+       for _, name := range names {
+               node := dn.inodes[name]
+               if fn, ok := node.(*filenode); ok {
+                       fn.waitPrune()
+               }
        }
-       sort.Strings(names)
 
+       var dirnames []string
+       var filenames []string
        for _, name := range names {
-               switch node := dn.inodes[name].(type) {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
                case *dirnode:
-                       subdir, err := node.marshalManifest(prefix + "/" + name)
-                       if err != nil {
-                               return "", err
-                       }
-                       subdirs = subdirs + subdir
+                       dirnames = append(dirnames, name)
                case *filenode:
+                       filenames = append(filenames, name)
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+
+       subdirs := make([]string, len(dirnames))
+       rootdir := ""
+       for i, name := range dirnames {
+               i, name := i, name
+               cg.Go(func() error {
+                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name)
+                       subdirs[i] = txt
+                       return err
+               })
+       }
+
+       cg.Go(func() error {
+               var streamLen int64
+               type filepart struct {
+                       name   string
+                       offset int64
+                       length int64
+               }
+
+               var fileparts []filepart
+               var blocks []string
+               if err := dn.flush(cg.Context(), filenames, flushOpts{sync: true, shortBlocks: true}); err != nil {
+                       return err
+               }
+               for _, name := range filenames {
+                       node := dn.inodes[name].(*filenode)
                        if len(node.segments) == 0 {
                                fileparts = append(fileparts, filepart{name: name})
-                               break
+                               continue
                        }
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
@@ -1078,24 +967,25 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
                                default:
                                        // This can't happen: we
                                        // haven't unlocked since
-                                       // calling sync().
+                                       // calling flush(sync=true).
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
-               default:
-                       panic(fmt.Sprintf("can't marshal inode type %T", node))
                }
-       }
-       var filetokens []string
-       for _, s := range fileparts {
-               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
-       }
-       if len(filetokens) == 0 {
-               return subdirs, nil
-       } else if len(blocks) == 0 {
-               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
-       }
-       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+               var filetokens []string
+               for _, s := range fileparts {
+                       filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+               }
+               if len(filetokens) == 0 {
+                       return nil
+               } else if len(blocks) == 0 {
+                       blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+               }
+               rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+               return nil
+       })
+       err := cg.Wait()
+       return rootdir + strings.Join(subdirs, ""), err
 }
 
 func (dn *dirnode) loadManifest(txt string) error {
@@ -1156,8 +1046,14 @@ func (dn *dirnode) loadManifest(txt string) error {
                        }
                        name := dirname + "/" + manifestUnescape(toks[2])
                        fnode, err := dn.createFileAndParents(name)
-                       if err != nil {
-                               return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+                       if fnode == nil && err == nil && length == 0 {
+                               // Special case: an empty file used as
+                               // a marker to preserve an otherwise
+                               // empty directory in a manifest.
+                               continue
+                       }
+                       if err != nil || (fnode == nil && length != 0) {
+                               return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
                        }
                        // Map the stream offset/range coordinates to
                        // block/offset/range coordinates and add
@@ -1188,7 +1084,7 @@ func (dn *dirnode) loadManifest(txt string) error {
                                        blkLen = int(offset + length - pos - int64(blkOff))
                                }
                                fnode.appendSegment(storedSegment{
-                                       kc:      dn.kc,
+                                       kc:      dn.fs,
                                        locator: seg.locator,
                                        size:    seg.size,
                                        offset:  blkOff,
@@ -1215,15 +1111,14 @@ func (dn *dirnode) loadManifest(txt string) error {
        return nil
 }
 
-// only safe to call from loadManifest -- no locking
+// only safe to call from loadManifest -- no locking.
+//
+// If path is a "parent directory exists" marker (the last path
+// component is "."), the returned values are both nil.
 func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
-       node := dn
+       var node inode = dn
        names := strings.Split(path, "/")
        basename := names[len(names)-1]
-       if basename == "" || basename == "." || basename == ".." {
-               err = fmt.Errorf("invalid filename")
-               return
-       }
        for _, name := range names[:len(names)-1] {
                switch name {
                case "", ".":
@@ -1233,108 +1128,69 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
                                // can't be sure parent will be a *dirnode
                                return nil, ErrInvalidArgument
                        }
-                       node = node.Parent().(*dirnode)
+                       node = node.Parent()
                        continue
                }
-               node.Child(name, func(child inode) inode {
-                       switch child.(type) {
-                       case nil:
-                               node, err = dn.newDirnode(node, name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
-                               child = node
-                       case *dirnode:
-                               node = child.(*dirnode)
-                       case *filenode:
-                               err = ErrFileExists
-                       default:
-                               err = ErrInvalidOperation
+               node, err = node.Child(name, func(child inode) (inode, error) {
+                       if child == nil {
+                               child, err := node.FS().newNode(name, 0755|os.ModeDir, node.Parent().FileInfo().ModTime())
+                               if err != nil {
+                                       return nil, err
+                               }
+                               child.SetParent(node, name)
+                               return child, nil
+                       } else if !child.IsDir() {
+                               return child, ErrFileExists
+                       } else {
+                               return child, nil
                        }
-                       return child
                })
                if err != nil {
                        return
                }
        }
-       node.Child(basename, func(child inode) inode {
+       if basename == "." {
+               return
+       } else if !permittedName(basename) {
+               err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+               return
+       }
+       _, err = node.Child(basename, func(child inode) (inode, error) {
                switch child := child.(type) {
                case nil:
-                       fn, err = dn.newFilenode(node, basename, 0755, node.FileInfo().ModTime())
-                       return fn
+                       child, err = node.FS().newNode(basename, 0755, node.FileInfo().ModTime())
+                       if err != nil {
+                               return nil, err
+                       }
+                       child.SetParent(node, basename)
+                       fn = child.(*filenode)
+                       return child, nil
                case *filenode:
                        fn = child
-                       return child
+                       return child, nil
                case *dirnode:
-                       err = ErrIsDirectory
-                       return child
+                       return child, ErrIsDirectory
                default:
-                       err = ErrInvalidOperation
-                       return child
+                       return child, ErrInvalidArgument
                }
        })
        return
 }
 
-// rlookup (recursive lookup) returns the inode for the file/directory
-// with the given name (which may contain "/" separators). If no such
-// file/directory exists, the returned node is nil.
-func rlookup(start inode, path string) (node inode) {
-       node = start
-       for _, name := range strings.Split(path, "/") {
-               if node == nil {
-                       break
-               }
-               if node.IsDir() {
-                       if name == "." || name == "" {
-                               continue
-                       }
-                       if name == ".." {
-                               node = node.Parent()
-                               continue
-                       }
+func (dn *dirnode) TreeSize() (bytes int64) {
+       dn.RLock()
+       defer dn.RUnlock()
+       for _, i := range dn.inodes {
+               switch i := i.(type) {
+               case *filenode:
+                       bytes += i.Size()
+               case *dirnode:
+                       bytes += i.TreeSize()
                }
-               node = func() inode {
-                       node.RLock()
-                       defer node.RUnlock()
-                       return node.Child(name, nil)
-               }()
        }
        return
 }
 
-// Caller must have lock, and must have already ensured
-// Children(name,nil) is nil.
-func (dn *dirnode) newDirnode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *dirnode, err error) {
-       if name == "" || name == "." || name == ".." {
-               return nil, ErrInvalidArgument
-       }
-       return &dirnode{
-               client: dn.client,
-               kc:     dn.kc,
-               treenode: treenode{
-                       parent: parent,
-                       fileinfo: fileinfo{
-                               name:    name,
-                               mode:    perm | os.ModeDir,
-                               modTime: modTime,
-                       },
-                       inodes: make(map[string]inode),
-               },
-       }, nil
-}
-
-func (dn *dirnode) newFilenode(parent *dirnode, name string, perm os.FileMode, modTime time.Time) (node *filenode, err error) {
-       if name == "" || name == "." || name == ".." {
-               return nil, ErrInvalidArgument
-       }
-       return &filenode{
-               parent: parent,
-               fileinfo: fileinfo{
-                       name:    name,
-                       mode:    perm & ^os.ModeDir,
-                       modTime: modTime,
-               },
-       }, nil
-}
-
 type segment interface {
        io.ReaderAt
        Len() int
@@ -1345,6 +1201,11 @@ type segment interface {
 
 type memSegment struct {
        buf []byte
+       // If flushing is not nil, then a) buf is being shared by a
+       // pruneMemSegments goroutine, and must be copied on write;
+       // and b) the flushing channel will close when the goroutine
+       // finishes, whether it succeeds or not.
+       flushing <-chan struct{}
 }
 
 func (me *memSegment) Len() int {
@@ -1361,28 +1222,31 @@ func (me *memSegment) Slice(off, length int) segment {
 }
 
 func (me *memSegment) Truncate(n int) {
-       if n > cap(me.buf) {
+       if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
                newsize := 1024
                for newsize < n {
                        newsize = newsize << 2
                }
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
-               me.buf = newbuf
+               me.buf, me.flushing = newbuf, nil
        } else {
-               // Zero unused part when shrinking, in case we grow
-               // and start using it again later.
-               for i := n; i < len(me.buf); i++ {
+               // reclaim existing capacity, and zero reclaimed part
+               oldlen := len(me.buf)
+               me.buf = me.buf[:n]
+               for i := oldlen; i < n; i++ {
                        me.buf[i] = 0
                }
        }
-       me.buf = me.buf[:n]
 }
 
 func (me *memSegment) WriteAt(p []byte, off int) {
        if off+len(p) > len(me.buf) {
                panic("overflowed segment")
        }
+       if me.flushing != nil {
+               me.buf, me.flushing = append([]byte(nil), me.buf...), nil
+       }
        copy(me.buf[off:], p)
 }
 
@@ -1399,7 +1263,7 @@ func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
 }
 
 type storedSegment struct {
-       kc      keepClient
+       kc      fsBackend
        locator string
        size    int // size of stored block (also encoded in locator)
        offset  int // position of segment within the stored block