Merge branch 'master' of git.curoverse.com:arvados into 11876-r-sdk
authorFuad Muhic <fmuhic@capeannenterprises.com>
Fri, 1 Dec 2017 15:21:32 +0000 (16:21 +0100)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Fri, 1 Dec 2017 15:21:32 +0000 (16:21 +0100)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

32 files changed:
build/run-tests.sh
doc/install/install-keep-web.html.textile.liquid
sdk/go/arvados/client.go
sdk/go/arvados/collection.go
sdk/go/arvados/collection_fs.go
sdk/go/arvados/collection_fs_test.go
sdk/go/arvados/error.go
sdk/go/httpserver/id_generator.go
sdk/go/httpserver/logger.go [new file with mode: 0644]
sdk/go/httpserver/logger_test.go [new file with mode: 0644]
sdk/go/httpserver/responsewriter.go
sdk/go/keepclient/block_cache.go
sdk/go/keepclient/collectionreader.go
sdk/go/keepclient/collectionreader_test.go
sdk/go/keepclient/keepclient.go
services/arv-git-httpd/auth_handler.go
services/crunch-run/crunchrun_test.go
services/keep-web/cache.go
services/keep-web/cadaver_test.go
services/keep-web/doc.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/webdav.go
services/keep-web/webdav_test.go [new file with mode: 0644]
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepproxy/proxy_client.go
services/keepstore/handler_test.go
services/keepstore/keepstore.go
services/keepstore/logging_router.go [deleted file]
services/keepstore/logging_router_test.go [deleted file]
services/keepstore/s3_volume.go

index 3cfc692aaec5bc1f595b9333fa6be9179164f200..365931d33281d533e88442d2beb7b9f7f454878f 100755 (executable)
@@ -487,6 +487,7 @@ export PERLLIB="$PERLINSTALLBASE/lib/perl5:${PERLLIB:+$PERLLIB}"
 
 export GOPATH
 mkdir -p "$GOPATH/src/git.curoverse.com"
+rmdir --parents "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH"
 ln -sfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
     || fatal "symlink failed"
 go get -v github.com/kardianos/govendor \
index ea2ebd161b9257291c44d5edd4a10fa955631671..4def77e063c879c53b9e0ea2529483360157636e 100644 (file)
@@ -9,7 +9,7 @@ Copyright (C) The Arvados Authors. All rights reserved.
 SPDX-License-Identifier: CC-BY-SA-3.0
 {% endcomment %}
 
-The Keep-web server provides read-only HTTP access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
+The Keep-web server provides read/write HTTP (WebDAV) access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides TLS support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
 
 By convention, we use the following hostnames for the Keep-web service:
 
@@ -48,7 +48,7 @@ Usage of keep-web:
   -allow-anonymous
         Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection. (default false)
   -attachment-only-host string
-        Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or SSL.
+        Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or TLS.
   -listen string
         Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
   -trust-all-content
@@ -77,15 +77,15 @@ exec sudo -u nobody keep-web \
 
 Omit the @-allow-anonymous@ argument if you do not want to serve public data.
 
-Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's SSL certificate is not signed by a recognized CA.
+Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's TLS certificate is not signed by a recognized CA.
 
-h3. Set up a reverse proxy with SSL support
+h3. Set up a reverse proxy with TLS support
 
-The Keep-web service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+The Keep-web service will be accessible from anywhere on the internet, so we recommend using TLS for transport encryption.
 
-This is best achieved by putting a reverse proxy with SSL support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
+This is best achieved by putting a reverse proxy with TLS support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
 
-Note: A wildcard SSL certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard SSL certificate and DNS configured appropriately, all data can be served as web content.
+Note: A wildcard TLS certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard TLS certificate and DNS configured appropriately, all data can be served as web content.
 
 For example, using Nginx:
 
@@ -125,8 +125,8 @@ h3. Configure DNS
 Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
 * @download.uuid_prefix.your.domain@
 * @collections.uuid_prefix.your.domain@
-* @*--collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
-* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names.
+* @*--collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
+* @*.collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for these names.
 
 If neither of the above wildcard options is feasible, you have two choices:
 # Serve web content at @collections.uuid_prefix.your.domain@, but only for unauthenticated requests (public data and collection sharing links). Authenticated requests will always result in file downloads, using the @download@ name. For example, the Workbench "preview" button and the "view entire log file" link will invoke file downloads instead of displaying content in the browser window.
index 47a953ac2c02ab4422eeec34e5c18c27fbf61947..a38d95c2e68ee90e1d9f0d41bdef2b341127fd27 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "bytes"
        "crypto/tls"
        "encoding/json"
        "fmt"
@@ -180,6 +181,10 @@ func anythingToValues(params interface{}) (url.Values, error) {
 //
 // path must not contain a query string.
 func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+       if body, ok := body.(io.Closer); ok {
+               // Ensure body is closed even if we error out early
+               defer body.Close()
+       }
        urlString := c.apiURL(path)
        urlValues, err := anythingToValues(params)
        if err != nil {
@@ -202,6 +207,24 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
        return c.DoAndDecode(dst, req)
 }
 
+type resource interface {
+       resourceName() string
+}
+
+// UpdateBody returns an io.Reader suitable for use as an http.Request
+// Body for a create or update API call.
+func (c *Client) UpdateBody(rsc resource) io.Reader {
+       j, err := json.Marshal(rsc)
+       if err != nil {
+               // Return a reader that returns errors.
+               r, w := io.Pipe()
+               w.CloseWithError(err)
+               return r
+       }
+       v := url.Values{rsc.resourceName(): {string(j)}}
+       return bytes.NewBufferString(v.Encode())
+}
+
 func (c *Client) httpClient() *http.Client {
        switch {
        case c.Client != nil:
index 61bcd7fe8f367f2ca3d3d48017213bbd5335d33e..999b4e9d483454ace177cad829e90f85ddccc44c 100644 (file)
@@ -30,6 +30,10 @@ type Collection struct {
        IsTrashed              bool       `json:"is_trashed,omitempty"`
 }
 
+func (c Collection) resourceName() string {
+       return "collection"
+}
+
 // SizedDigests returns the hash+size part of each data block
 // referenced by the collection.
 func (c *Collection) SizedDigests() ([]SizedDigest, error) {
index 1acf27442c3dc514abe1c3b2538549cba9bbcd9f..28629e33b20f31189ac460d02ab2868fdd85db96 100644 (file)
 package arvados
 
 import (
+       "errors"
+       "fmt"
        "io"
        "net/http"
        "os"
        "path"
+       "regexp"
+       "sort"
+       "strconv"
        "strings"
        "sync"
        "time"
+)
+
+var (
+       ErrReadOnlyFile      = errors.New("read-only file")
+       ErrNegativeOffset    = errors.New("cannot seek to negative offset")
+       ErrFileExists        = errors.New("file exists")
+       ErrInvalidOperation  = errors.New("invalid operation")
+       ErrInvalidArgument   = errors.New("invalid argument")
+       ErrDirectoryNotEmpty = errors.New("directory not empty")
+       ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
+       ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
+       ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
+       ErrPermission        = os.ErrPermission
 
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
+       maxBlockSize = 1 << 26
 )
 
+// A File is an *os.File-like interface for reading and writing files
+// in a CollectionFileSystem.
 type File interface {
        io.Reader
+       io.Writer
        io.Closer
        io.Seeker
        Size() int64
+       Readdir(int) ([]os.FileInfo, error)
+       Stat() (os.FileInfo, error)
+       Truncate(int64) error
 }
 
 type keepClient interface {
-       ManifestFileReader(manifest.Manifest, string) (File, error)
+       ReadAt(locator string, p []byte, off int) (int, error)
+       PutB(p []byte) (string, int, error)
 }
 
-type collectionFile struct {
-       File
-       collection *Collection
-       name       string
-       size       int64
+type fileinfo struct {
+       name    string
+       mode    os.FileMode
+       size    int64
+       modTime time.Time
 }
 
-func (cf *collectionFile) Size() int64 {
-       return cf.size
+// Name implements os.FileInfo.
+func (fi fileinfo) Name() string {
+       return fi.name
 }
 
-func (cf *collectionFile) Readdir(count int) ([]os.FileInfo, error) {
-       return nil, io.EOF
+// ModTime implements os.FileInfo.
+func (fi fileinfo) ModTime() time.Time {
+       return fi.modTime
 }
 
-func (cf *collectionFile) Stat() (os.FileInfo, error) {
-       return collectionDirent{
-               collection: cf.collection,
-               name:       cf.name,
-               size:       cf.size,
-               isDir:      false,
-       }, nil
+// Mode implements os.FileInfo.
+func (fi fileinfo) Mode() os.FileMode {
+       return fi.mode
 }
 
-type collectionDir struct {
-       collection *Collection
-       stream     string
-       dirents    []os.FileInfo
+// IsDir implements os.FileInfo.
+func (fi fileinfo) IsDir() bool {
+       return fi.mode&os.ModeDir != 0
 }
 
-// Readdir implements os.File.
-func (cd *collectionDir) Readdir(count int) ([]os.FileInfo, error) {
-       ret := cd.dirents
-       if count <= 0 {
-               cd.dirents = nil
-               return ret, nil
-       } else if len(ret) == 0 {
-               return nil, io.EOF
+// Size implements os.FileInfo.
+func (fi fileinfo) Size() int64 {
+       return fi.size
+}
+
+// Sys implements os.FileInfo.
+func (fi fileinfo) Sys() interface{} {
+       return nil
+}
+
+// A CollectionFileSystem is an http.Filesystem plus Stat() and
+// support for opening writable files. All methods are safe to call
+// from multiple goroutines.
+type CollectionFileSystem interface {
+       http.FileSystem
+
+       // analogous to os.Stat()
+       Stat(name string) (os.FileInfo, error)
+
+       // analogous to os.Create(): create/truncate a file and open it O_RDWR.
+       Create(name string) (File, error)
+
+       // Like os.OpenFile(): create or open a file or directory.
+       //
+       // If flag&os.O_EXCL==0, it opens an existing file or
+       // directory if one exists. If flag&os.O_CREATE!=0, it creates
+       // a new empty file or directory if one does not already
+       // exist.
+       //
+       // When creating a new item, perm&os.ModeDir determines
+       // whether it is a file or a directory.
+       //
+       // A file can be opened multiple times and used concurrently
+       // from multiple goroutines. However, each File object should
+       // be used by only one goroutine at a time.
+       OpenFile(name string, flag int, perm os.FileMode) (File, error)
+
+       Mkdir(name string, perm os.FileMode) error
+       Remove(name string) error
+       RemoveAll(name string) error
+       Rename(oldname, newname string) error
+
+       // Flush all file data to Keep and return a snapshot of the
+       // filesystem suitable for saving as (Collection)ManifestText.
+       // Prefix (normally ".") is a top level directory, effectively
+       // prepended to all paths in the returned manifest.
+       MarshalManifest(prefix string) (string, error)
+}
+
+type fileSystem struct {
+       dirnode
+}
+
+func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
+       return fs.dirnode.OpenFile(name, flag, perm)
+}
+
+func (fs *fileSystem) Open(name string) (http.File, error) {
+       return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
+}
+
+func (fs *fileSystem) Create(name string) (File, error) {
+       return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
+}
+
+func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
+       node := fs.dirnode.lookupPath(name)
+       if node == nil {
+               err = os.ErrNotExist
+       } else {
+               fi = node.Stat()
        }
-       var err error
-       if count >= len(ret) {
-               count = len(ret)
-               err = io.EOF
+       return
+}
+
+type inode interface {
+       Parent() inode
+       Read([]byte, filenodePtr) (int, filenodePtr, error)
+       Write([]byte, filenodePtr) (int, filenodePtr, error)
+       Truncate(int64) error
+       Readdir() []os.FileInfo
+       Size() int64
+       Stat() os.FileInfo
+       sync.Locker
+       RLock()
+       RUnlock()
+}
+
+// filenode implements inode.
+type filenode struct {
+       fileinfo fileinfo
+       parent   *dirnode
+       segments []segment
+       // number of times `segments` has changed in a
+       // way that might invalidate a filenodePtr
+       repacked int64
+       memsize  int64 // bytes in memSegments
+       sync.RWMutex
+}
+
+// filenodePtr is an offset into a file that is (usually) efficient to
+// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
+// then
+// filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
+// corresponds to file offset filenodePtr.off. Otherwise, it is
+// necessary to reexamine len(filenode.segments[0]) etc. to find the
+// correct segment and offset.
+type filenodePtr struct {
+       off        int64
+       segmentIdx int
+       segmentOff int
+       repacked   int64
+}
+
+// seek returns a ptr that is consistent with both startPtr.off and
+// the current state of fn. The caller must already hold fn.RLock() or
+// fn.Lock().
+//
+// If startPtr is beyond EOF, ptr.segment* will indicate precisely
+// EOF.
+//
+// After seeking:
+//
+//     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+//     ||
+//     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
+       ptr = startPtr
+       if ptr.off < 0 {
+               // meaningless anyway
+               return
+       } else if ptr.off >= fn.fileinfo.size {
+               ptr.segmentIdx = len(fn.segments)
+               ptr.segmentOff = 0
+               ptr.repacked = fn.repacked
+               return
+       } else if ptr.repacked == fn.repacked {
+               // segmentIdx and segmentOff accurately reflect
+               // ptr.off, but might have fallen off the end of a
+               // segment
+               if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
+                       ptr.segmentIdx++
+                       ptr.segmentOff = 0
+               }
+               return
        }
-       cd.dirents = cd.dirents[count:]
-       return ret[:count], err
+       defer func() {
+               ptr.repacked = fn.repacked
+       }()
+       if ptr.off >= fn.fileinfo.size {
+               ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
+               return
+       }
+       // Recompute segmentIdx and segmentOff.  We have already
+       // established fn.fileinfo.size > ptr.off >= 0, so we don't
+       // have to deal with edge cases here.
+       var off int64
+       for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
+               // This would panic (index out of range) if
+               // fn.fileinfo.size were larger than
+               // sum(fn.segments[i].Len()) -- but that can't happen
+               // because we have ensured fn.fileinfo.size is always
+               // accurate.
+               segLen := int64(fn.segments[ptr.segmentIdx].Len())
+               if off+segLen > ptr.off {
+                       ptr.segmentOff = int(ptr.off - off)
+                       break
+               }
+               off += segLen
+       }
+       return
 }
 
-// Stat implements os.File.
-func (cd *collectionDir) Stat() (os.FileInfo, error) {
-       return collectionDirent{
-               collection: cd.collection,
-               name:       path.Base(cd.stream),
-               isDir:      true,
-               size:       int64(len(cd.dirents)),
-       }, nil
+// caller must have lock
+func (fn *filenode) appendSegment(e segment) {
+       fn.segments = append(fn.segments, e)
+       fn.fileinfo.size += int64(e.Len())
+}
+
+func (fn *filenode) Parent() inode {
+       fn.RLock()
+       defer fn.RUnlock()
+       return fn.parent
 }
 
-// Close implements os.File.
-func (cd *collectionDir) Close() error {
+func (fn *filenode) Readdir() []os.FileInfo {
        return nil
 }
 
-// Read implements os.File.
-func (cd *collectionDir) Read([]byte) (int, error) {
-       return 0, nil
+// Read reads file data from a single segment, starting at startPtr,
+// into p. startPtr is assumed not to be up-to-date. Caller must have
+// RLock or Lock.
+func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+       ptr = fn.seek(startPtr)
+       if ptr.off < 0 {
+               err = ErrNegativeOffset
+               return
+       }
+       if ptr.segmentIdx >= len(fn.segments) {
+               err = io.EOF
+               return
+       }
+       n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
+       if n > 0 {
+               ptr.off += int64(n)
+               ptr.segmentOff += n
+               if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
+                       ptr.segmentIdx++
+                       ptr.segmentOff = 0
+                       if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
+                               err = nil
+                       }
+               }
+       }
+       return
 }
 
-// Seek implements os.File.
-func (cd *collectionDir) Seek(int64, int) (int64, error) {
-       return 0, nil
+func (fn *filenode) Size() int64 {
+       fn.RLock()
+       defer fn.RUnlock()
+       return fn.fileinfo.Size()
 }
 
-// collectionDirent implements os.FileInfo.
-type collectionDirent struct {
-       collection *Collection
-       name       string
-       isDir      bool
-       mode       os.FileMode
-       size       int64
+func (fn *filenode) Stat() os.FileInfo {
+       fn.RLock()
+       defer fn.RUnlock()
+       return fn.fileinfo
 }
 
-// Name implements os.FileInfo.
-func (e collectionDirent) Name() string {
-       return e.name
+func (fn *filenode) Truncate(size int64) error {
+       fn.Lock()
+       defer fn.Unlock()
+       return fn.truncate(size)
 }
 
-// ModTime implements os.FileInfo.
-func (e collectionDirent) ModTime() time.Time {
-       if e.collection.ModifiedAt == nil {
-               return time.Now()
+func (fn *filenode) truncate(size int64) error {
+       if size == fn.fileinfo.size {
+               return nil
+       }
+       fn.repacked++
+       if size < fn.fileinfo.size {
+               ptr := fn.seek(filenodePtr{off: size})
+               for i := ptr.segmentIdx; i < len(fn.segments); i++ {
+                       if seg, ok := fn.segments[i].(*memSegment); ok {
+                               fn.memsize -= int64(seg.Len())
+                       }
+               }
+               if ptr.segmentOff == 0 {
+                       fn.segments = fn.segments[:ptr.segmentIdx]
+               } else {
+                       fn.segments = fn.segments[:ptr.segmentIdx+1]
+                       switch seg := fn.segments[ptr.segmentIdx].(type) {
+                       case *memSegment:
+                               seg.Truncate(ptr.segmentOff)
+                               fn.memsize += int64(seg.Len())
+                       default:
+                               fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
+                       }
+               }
+               fn.fileinfo.size = size
+               return nil
        }
-       return *e.collection.ModifiedAt
+       for size > fn.fileinfo.size {
+               grow := size - fn.fileinfo.size
+               var seg *memSegment
+               var ok bool
+               if len(fn.segments) == 0 {
+                       seg = &memSegment{}
+                       fn.segments = append(fn.segments, seg)
+               } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
+                       seg = &memSegment{}
+                       fn.segments = append(fn.segments, seg)
+               }
+               if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
+                       grow = maxgrow
+               }
+               seg.Truncate(seg.Len() + int(grow))
+               fn.fileinfo.size += grow
+               fn.memsize += grow
+       }
+       return nil
 }
 
-// Mode implements os.FileInfo.
-func (e collectionDirent) Mode() os.FileMode {
-       if e.isDir {
-               return 0555
+// Write writes data from p to the file, starting at startPtr,
+// extending the file size if necessary. Caller must have Lock.
+func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+       if startPtr.off > fn.fileinfo.size {
+               if err = fn.truncate(startPtr.off); err != nil {
+                       return 0, startPtr, err
+               }
+       }
+       ptr = fn.seek(startPtr)
+       if ptr.off < 0 {
+               err = ErrNegativeOffset
+               return
+       }
+       for len(p) > 0 && err == nil {
+               cando := p
+               if len(cando) > maxBlockSize {
+                       cando = cando[:maxBlockSize]
+               }
+               // Rearrange/grow fn.segments (and shrink cando if
+               // needed) such that cando can be copied to
+               // fn.segments[ptr.segmentIdx] at offset
+               // ptr.segmentOff.
+               cur := ptr.segmentIdx
+               prev := ptr.segmentIdx - 1
+               var curWritable bool
+               if cur < len(fn.segments) {
+                       _, curWritable = fn.segments[cur].(*memSegment)
+               }
+               var prevAppendable bool
+               if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
+                       _, prevAppendable = fn.segments[prev].(*memSegment)
+               }
+               if ptr.segmentOff > 0 && !curWritable {
+                       // Split a non-writable block.
+                       if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
+                               // Truncate cur, and insert a new
+                               // segment after it.
+                               cando = cando[:max]
+                               fn.segments = append(fn.segments, nil)
+                               copy(fn.segments[cur+1:], fn.segments[cur:])
+                       } else {
+                               // Split cur into two copies, truncate
+                               // the one on the left, shift the one
+                               // on the right, and insert a new
+                               // segment between them.
+                               fn.segments = append(fn.segments, nil, nil)
+                               copy(fn.segments[cur+2:], fn.segments[cur:])
+                               fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
+                       }
+                       cur++
+                       prev++
+                       seg := &memSegment{}
+                       seg.Truncate(len(cando))
+                       fn.memsize += int64(len(cando))
+                       fn.segments[cur] = seg
+                       fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
+                       ptr.segmentIdx++
+                       ptr.segmentOff = 0
+                       fn.repacked++
+                       ptr.repacked++
+               } else if curWritable {
+                       if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
+                               cando = cando[:fit]
+                       }
+               } else {
+                       if prevAppendable {
+                               // Shrink cando if needed to fit in
+                               // prev segment.
+                               if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
+                                       cando = cando[:cangrow]
+                               }
+                       }
+
+                       if cur == len(fn.segments) {
+                               // ptr is at EOF, filesize is changing.
+                               fn.fileinfo.size += int64(len(cando))
+                       } else if el := fn.segments[cur].Len(); el <= len(cando) {
+                               // cando is long enough that we won't
+                               // need cur any more. shrink cando to
+                               // be exactly as long as cur
+                               // (otherwise we'd accidentally shift
+                               // the effective position of all
+                               // segments after cur).
+                               cando = cando[:el]
+                               copy(fn.segments[cur:], fn.segments[cur+1:])
+                               fn.segments = fn.segments[:len(fn.segments)-1]
+                       } else {
+                               // shrink cur by the same #bytes we're growing prev
+                               fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
+                       }
+
+                       if prevAppendable {
+                               // Grow prev.
+                               ptr.segmentIdx--
+                               ptr.segmentOff = fn.segments[prev].Len()
+                               fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
+                               fn.memsize += int64(len(cando))
+                               ptr.repacked++
+                               fn.repacked++
+                       } else {
+                               // Insert a segment between prev and
+                               // cur, and advance prev/cur.
+                               fn.segments = append(fn.segments, nil)
+                               if cur < len(fn.segments) {
+                                       copy(fn.segments[cur+1:], fn.segments[cur:])
+                                       ptr.repacked++
+                                       fn.repacked++
+                               } else {
+                                       // appending a new segment does
+                                       // not invalidate any ptrs
+                               }
+                               seg := &memSegment{}
+                               seg.Truncate(len(cando))
+                               fn.memsize += int64(len(cando))
+                               fn.segments[cur] = seg
+                               cur++
+                               prev++
+                       }
+               }
+
+               // Finally we can copy bytes from cando to the current segment.
+               fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
+               n += len(cando)
+               p = p[len(cando):]
+
+               ptr.off += int64(len(cando))
+               ptr.segmentOff += len(cando)
+               if ptr.segmentOff >= maxBlockSize {
+                       fn.pruneMemSegments()
+               }
+               if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
+                       ptr.segmentOff = 0
+                       ptr.segmentIdx++
+               }
+
+               fn.fileinfo.modTime = time.Now()
+       }
+       return
+}
+
+// Write some data out to disk to reduce memory use. Caller must have
+// write lock.
+func (fn *filenode) pruneMemSegments() {
+       // TODO: async (don't hold Lock() while waiting for Keep)
+       // TODO: share code with (*dirnode)sync()
+       // TODO: pack/flush small blocks too, when fragmented
+       for idx, seg := range fn.segments {
+               seg, ok := seg.(*memSegment)
+               if !ok || seg.Len() < maxBlockSize {
+                       continue
+               }
+               locator, _, err := fn.parent.kc.PutB(seg.buf)
+               if err != nil {
+                       // TODO: stall (or return errors from)
+                       // subsequent writes until flushing
+                       // starts to succeed
+                       continue
+               }
+               fn.memsize -= int64(seg.Len())
+               fn.segments[idx] = storedSegment{
+                       kc:      fn.parent.kc,
+                       locator: locator,
+                       size:    seg.Len(),
+                       offset:  0,
+                       length:  seg.Len(),
+               }
+       }
+}
+
+// FileSystem returns a CollectionFileSystem for the collection.
+func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
+       var modTime time.Time
+       if c.ModifiedAt == nil {
+               modTime = time.Now()
        } else {
-               return 0444
+               modTime = *c.ModifiedAt
+       }
+       fs := &fileSystem{dirnode: dirnode{
+               client: client,
+               kc:     kc,
+               fileinfo: fileinfo{
+                       name:    ".",
+                       mode:    os.ModeDir | 0755,
+                       modTime: modTime,
+               },
+               parent: nil,
+               inodes: make(map[string]inode),
+       }}
+       fs.dirnode.parent = &fs.dirnode
+       if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
+               return nil, err
        }
+       return fs, nil
 }
 
-// IsDir implements os.FileInfo.
-func (e collectionDirent) IsDir() bool {
-       return e.isDir
+type filehandle struct {
+       inode
+       ptr        filenodePtr
+       append     bool
+       readable   bool
+       writable   bool
+       unreaddirs []os.FileInfo
 }
 
-// Size implements os.FileInfo.
-func (e collectionDirent) Size() int64 {
-       return e.size
+func (f *filehandle) Read(p []byte) (n int, err error) {
+       if !f.readable {
+               return 0, ErrWriteOnlyMode
+       }
+       f.inode.RLock()
+       defer f.inode.RUnlock()
+       n, f.ptr, err = f.inode.Read(p, f.ptr)
+       return
 }
 
-// Sys implements os.FileInfo.
-func (e collectionDirent) Sys() interface{} {
-       return nil
+func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
+       size := f.inode.Size()
+       ptr := f.ptr
+       switch whence {
+       case io.SeekStart:
+               ptr.off = off
+       case io.SeekCurrent:
+               ptr.off += off
+       case io.SeekEnd:
+               ptr.off = size + off
+       }
+       if ptr.off < 0 {
+               return f.ptr.off, ErrNegativeOffset
+       }
+       if ptr.off != f.ptr.off {
+               f.ptr = ptr
+               // force filenode to recompute f.ptr fields on next
+               // use
+               f.ptr.repacked = -1
+       }
+       return f.ptr.off, nil
 }
 
-// A CollectionFileSystem is an http.Filesystem with an added Stat() method.
-type CollectionFileSystem interface {
-       http.FileSystem
-       Stat(name string) (os.FileInfo, error)
+func (f *filehandle) Truncate(size int64) error {
+       return f.inode.Truncate(size)
 }
 
-// collectionFS implements CollectionFileSystem.
-type collectionFS struct {
-       collection *Collection
-       client     *Client
-       kc         keepClient
-       sizes      map[string]int64
-       sizesOnce  sync.Once
+func (f *filehandle) Write(p []byte) (n int, err error) {
+       if !f.writable {
+               return 0, ErrReadOnlyFile
+       }
+       f.inode.Lock()
+       defer f.inode.Unlock()
+       if fn, ok := f.inode.(*filenode); ok && f.append {
+               f.ptr = filenodePtr{
+                       off:        fn.fileinfo.size,
+                       segmentIdx: len(fn.segments),
+                       segmentOff: 0,
+                       repacked:   fn.repacked,
+               }
+       }
+       n, f.ptr, err = f.inode.Write(p, f.ptr)
+       return
 }
 
-// FileSystem returns a CollectionFileSystem for the collection.
-func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
-       return &collectionFS{
-               collection: c,
-               client:     client,
-               kc:         kc,
-       }
-}
-
-func (c *collectionFS) Stat(name string) (os.FileInfo, error) {
-       name = canonicalName(name)
-       if name == "." {
-               return collectionDirent{
-                       collection: c.collection,
-                       name:       "/",
-                       isDir:      true,
-               }, nil
-       }
-       if size, ok := c.fileSizes()[name]; ok {
-               return collectionDirent{
-                       collection: c.collection,
-                       name:       path.Base(name),
-                       size:       size,
-                       isDir:      false,
-               }, nil
-       }
-       for fnm := range c.fileSizes() {
-               if !strings.HasPrefix(fnm, name+"/") {
+func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
+       if !f.inode.Stat().IsDir() {
+               return nil, ErrInvalidOperation
+       }
+       if count <= 0 {
+               return f.inode.Readdir(), nil
+       }
+       if f.unreaddirs == nil {
+               f.unreaddirs = f.inode.Readdir()
+       }
+       if len(f.unreaddirs) == 0 {
+               return nil, io.EOF
+       }
+       if count > len(f.unreaddirs) {
+               count = len(f.unreaddirs)
+       }
+       ret := f.unreaddirs[:count]
+       f.unreaddirs = f.unreaddirs[count:]
+       return ret, nil
+}
+
+func (f *filehandle) Stat() (os.FileInfo, error) {
+       return f.inode.Stat(), nil
+}
+
+func (f *filehandle) Close() error {
+       return nil
+}
+
+type dirnode struct {
+       fileinfo fileinfo
+       parent   *dirnode
+       client   *Client
+       kc       keepClient
+       inodes   map[string]inode
+       sync.RWMutex
+}
+
+// sync flushes in-memory data (for all files in the tree rooted at
+// dn) to persistent storage. Caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+       type shortBlock struct {
+               fn  *filenode
+               idx int
+       }
+       var pending []shortBlock
+       var pendingLen int
+
+       flush := func(sbs []shortBlock) error {
+               if len(sbs) == 0 {
+                       return nil
+               }
+               block := make([]byte, 0, maxBlockSize)
+               for _, sb := range sbs {
+                       block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
+               }
+               locator, _, err := dn.kc.PutB(block)
+               if err != nil {
+                       return err
+               }
+               off := 0
+               for _, sb := range sbs {
+                       data := sb.fn.segments[sb.idx].(*memSegment).buf
+                       sb.fn.segments[sb.idx] = storedSegment{
+                               kc:      dn.kc,
+                               locator: locator,
+                               size:    len(block),
+                               offset:  off,
+                               length:  len(data),
+                       }
+                       off += len(data)
+                       sb.fn.memsize -= int64(len(data))
+               }
+               return nil
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               fn, ok := dn.inodes[name].(*filenode)
+               if !ok {
                        continue
                }
-               return collectionDirent{
-                       collection: c.collection,
-                       name:       path.Base(name),
-                       isDir:      true,
-               }, nil
+               fn.Lock()
+               defer fn.Unlock()
+               for idx, seg := range fn.segments {
+                       seg, ok := seg.(*memSegment)
+                       if !ok {
+                               continue
+                       }
+                       if seg.Len() > maxBlockSize/2 {
+                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                       return err
+                               }
+                               continue
+                       }
+                       if pendingLen+seg.Len() > maxBlockSize {
+                               if err := flush(pending); err != nil {
+                                       return err
+                               }
+                               pending = nil
+                               pendingLen = 0
+                       }
+                       pending = append(pending, shortBlock{fn, idx})
+                       pendingLen += seg.Len()
+               }
        }
-       return nil, os.ErrNotExist
+       return flush(pending)
 }
 
-func (c *collectionFS) Open(name string) (http.File, error) {
-       // Ensure name looks the way it does in a manifest.
-       name = canonicalName(name)
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+       dn.Lock()
+       defer dn.Unlock()
+       return dn.marshalManifest(prefix)
+}
 
-       m := manifest.Manifest{Text: c.collection.ManifestText}
+// caller must have read lock.
+func (dn *dirnode) marshalManifest(prefix string) (string, error) {
+       var streamLen int64
+       type filepart struct {
+               name   string
+               offset int64
+               length int64
+       }
+       var fileparts []filepart
+       var subdirs string
+       var blocks []string
 
-       // Return a file if it exists.
-       if size, ok := c.fileSizes()[name]; ok {
-               reader, err := c.kc.ManifestFileReader(m, name)
-               if err != nil {
-                       return nil, err
+       if err := dn.sync(); err != nil {
+               return "", err
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name, node := range dn.inodes {
+               names = append(names, name)
+               node.Lock()
+               defer node.Unlock()
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               switch node := dn.inodes[name].(type) {
+               case *dirnode:
+                       subdir, err := node.marshalManifest(prefix + "/" + name)
+                       if err != nil {
+                               return "", err
+                       }
+                       subdirs = subdirs + subdir
+               case *filenode:
+                       if len(node.segments) == 0 {
+                               fileparts = append(fileparts, filepart{name: name})
+                               break
+                       }
+                       for _, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case storedSegment:
+                                       if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
+                                               streamLen -= int64(seg.size)
+                                       } else {
+                                               blocks = append(blocks, seg.locator)
+                                       }
+                                       next := filepart{
+                                               name:   name,
+                                               offset: streamLen + int64(seg.offset),
+                                               length: int64(seg.length),
+                                       }
+                                       if prev := len(fileparts) - 1; prev >= 0 &&
+                                               fileparts[prev].name == name &&
+                                               fileparts[prev].offset+fileparts[prev].length == next.offset {
+                                               fileparts[prev].length += next.length
+                                       } else {
+                                               fileparts = append(fileparts, next)
+                                       }
+                                       streamLen += int64(seg.size)
+                               default:
+                                       // This can't happen: we
+                                       // haven't unlocked since
+                                       // calling sync().
+                                       panic(fmt.Sprintf("can't marshal segment type %T", seg))
+                               }
+                       }
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+       var filetokens []string
+       for _, s := range fileparts {
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+       }
+       if len(filetokens) == 0 {
+               return subdirs, nil
+       } else if len(blocks) == 0 {
+               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+       }
+       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
+func (dn *dirnode) loadManifest(txt string) error {
+       var dirname string
+       streams := strings.Split(txt, "\n")
+       if streams[len(streams)-1] != "" {
+               return fmt.Errorf("line %d: no trailing newline", len(streams))
+       }
+       streams = streams[:len(streams)-1]
+       segments := []storedSegment{}
+       for i, stream := range streams {
+               lineno := i + 1
+               var anyFileTokens bool
+               var pos int64
+               var segIdx int
+               segments = segments[:0]
+               for i, token := range strings.Split(stream, " ") {
+                       if i == 0 {
+                               dirname = manifestUnescape(token)
+                               continue
+                       }
+                       if !strings.Contains(token, ":") {
+                               if anyFileTokens {
+                                       return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                               }
+                               toks := strings.SplitN(token, "+", 3)
+                               if len(toks) < 2 {
+                                       return fmt.Errorf("line %d: bad locator %q", lineno, token)
+                               }
+                               length, err := strconv.ParseInt(toks[1], 10, 32)
+                               if err != nil || length < 0 {
+                                       return fmt.Errorf("line %d: bad locator %q", lineno, token)
+                               }
+                               segments = append(segments, storedSegment{
+                                       locator: token,
+                                       size:    int(length),
+                                       offset:  0,
+                                       length:  int(length),
+                               })
+                               continue
+                       } else if len(segments) == 0 {
+                               return fmt.Errorf("line %d: bad locator %q", lineno, token)
+                       }
+
+                       toks := strings.Split(token, ":")
+                       if len(toks) != 3 {
+                               return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                       }
+                       anyFileTokens = true
+
+                       offset, err := strconv.ParseInt(toks[0], 10, 64)
+                       if err != nil || offset < 0 {
+                               return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                       }
+                       length, err := strconv.ParseInt(toks[1], 10, 64)
+                       if err != nil || length < 0 {
+                               return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                       }
+                       name := dirname + "/" + manifestUnescape(toks[2])
+                       fnode, err := dn.createFileAndParents(name)
+                       if err != nil {
+                               return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+                       }
+                       // Map the stream offset/range coordinates to
+                       // block/offset/range coordinates and add
+                       // corresponding storedSegments to the filenode
+                       if pos > offset {
+                               // Can't continue where we left off.
+                               // TODO: binary search instead of
+                               // rewinding all the way (but this
+                               // situation might be rare anyway)
+                               segIdx, pos = 0, 0
+                       }
+                       for next := int64(0); segIdx < len(segments); segIdx++ {
+                               seg := segments[segIdx]
+                               next = pos + int64(seg.Len())
+                               if next <= offset || seg.Len() == 0 {
+                                       pos = next
+                                       continue
+                               }
+                               if pos >= offset+length {
+                                       break
+                               }
+                               var blkOff int
+                               if pos < offset {
+                                       blkOff = int(offset - pos)
+                               }
+                               blkLen := seg.Len() - blkOff
+                               if pos+int64(blkOff+blkLen) > offset+length {
+                                       blkLen = int(offset + length - pos - int64(blkOff))
+                               }
+                               fnode.appendSegment(storedSegment{
+                                       kc:      dn.kc,
+                                       locator: seg.locator,
+                                       size:    seg.size,
+                                       offset:  blkOff,
+                                       length:  blkLen,
+                               })
+                               if next > offset+length {
+                                       break
+                               } else {
+                                       pos = next
+                               }
+                       }
+                       if segIdx == len(segments) && pos < offset+length {
+                               return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
+                       }
+               }
+               if !anyFileTokens {
+                       return fmt.Errorf("line %d: no file segments", lineno)
+               } else if len(segments) == 0 {
+                       return fmt.Errorf("line %d: no locators", lineno)
+               } else if dirname == "" {
+                       return fmt.Errorf("line %d: no stream name", lineno)
+               }
+       }
+       return nil
+}
+
+// only safe to call from loadManifest -- no locking
+func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+       names := strings.Split(path, "/")
+       basename := names[len(names)-1]
+       if basename == "" || basename == "." || basename == ".." {
+               err = fmt.Errorf("invalid filename")
+               return
+       }
+       for _, name := range names[:len(names)-1] {
+               switch name {
+               case "", ".":
+               case "..":
+                       dn = dn.parent
+               default:
+                       switch node := dn.inodes[name].(type) {
+                       case nil:
+                               dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime)
+                       case *dirnode:
+                               dn = node
+                       case *filenode:
+                               err = ErrFileExists
+                               return
+                       }
+               }
+       }
+       switch node := dn.inodes[basename].(type) {
+       case nil:
+               fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime)
+       case *filenode:
+               fn = node
+       case *dirnode:
+               err = ErrIsDirectory
+       }
+       return
+}
+
+func (dn *dirnode) mkdir(name string) (*filehandle, error) {
+       return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
+}
+
+func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
+       f, err := dn.mkdir(name)
+       if err == nil {
+               err = f.Close()
+       }
+       return err
+}
+
+func (dn *dirnode) Remove(name string) error {
+       return dn.remove(strings.TrimRight(name, "/"), false)
+}
+
+func (dn *dirnode) RemoveAll(name string) error {
+       err := dn.remove(strings.TrimRight(name, "/"), true)
+       if os.IsNotExist(err) {
+               // "If the path does not exist, RemoveAll returns
+               // nil." (see "os" pkg)
+               err = nil
+       }
+       return err
+}
+
+func (dn *dirnode) remove(name string, recursive bool) error {
+       dirname, name := path.Split(name)
+       if name == "" || name == "." || name == ".." {
+               return ErrInvalidArgument
+       }
+       dn, ok := dn.lookupPath(dirname).(*dirnode)
+       if !ok {
+               return os.ErrNotExist
+       }
+       dn.Lock()
+       defer dn.Unlock()
+       switch node := dn.inodes[name].(type) {
+       case nil:
+               return os.ErrNotExist
+       case *dirnode:
+               node.RLock()
+               defer node.RUnlock()
+               if !recursive && len(node.inodes) > 0 {
+                       return ErrDirectoryNotEmpty
                }
-               return &collectionFile{
-                       File:       reader,
-                       collection: c.collection,
-                       name:       path.Base(name),
-                       size:       size,
-               }, nil
        }
+       delete(dn.inodes, name)
+       return nil
+}
+
+func (dn *dirnode) Rename(oldname, newname string) error {
+       olddir, oldname := path.Split(oldname)
+       if oldname == "" || oldname == "." || oldname == ".." {
+               return ErrInvalidArgument
+       }
+       olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
+       if err != nil {
+               return fmt.Errorf("%q: %s", olddir, err)
+       }
+       defer olddirf.Close()
+       newdir, newname := path.Split(newname)
+       if newname == "." || newname == ".." {
+               return ErrInvalidArgument
+       } else if newname == "" {
+               // Rename("a/b", "c/") means Rename("a/b", "c/b")
+               newname = oldname
+       }
+       newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
+       if err != nil {
+               return fmt.Errorf("%q: %s", newdir, err)
+       }
+       defer newdirf.Close()
+
+       // When acquiring locks on multiple nodes, all common
+       // ancestors must be locked first in order to avoid
+       // deadlock. This is assured by locking the path from root to
+       // newdir, then locking the path from root to olddir, skipping
+       // any already-locked nodes.
+       needLock := []sync.Locker{}
+       for _, f := range []*filehandle{olddirf, newdirf} {
+               node := f.inode
+               needLock = append(needLock, node)
+               for node.Parent() != node {
+                       node = node.Parent()
+                       needLock = append(needLock, node)
+               }
+       }
+       locked := map[sync.Locker]bool{}
+       for i := len(needLock) - 1; i >= 0; i-- {
+               if n := needLock[i]; !locked[n] {
+                       n.Lock()
+                       defer n.Unlock()
+                       locked[n] = true
+               }
+       }
+
+       olddn := olddirf.inode.(*dirnode)
+       newdn := newdirf.inode.(*dirnode)
+       oldinode, ok := olddn.inodes[oldname]
+       if !ok {
+               return os.ErrNotExist
+       }
+       if existing, ok := newdn.inodes[newname]; ok {
+               // overwriting an existing file or dir
+               if dn, ok := existing.(*dirnode); ok {
+                       if !oldinode.Stat().IsDir() {
+                               return ErrIsDirectory
+                       }
+                       dn.RLock()
+                       defer dn.RUnlock()
+                       if len(dn.inodes) > 0 {
+                               return ErrDirectoryNotEmpty
+                       }
+               }
+       } else {
+               if newdn.inodes == nil {
+                       newdn.inodes = make(map[string]inode)
+               }
+               newdn.fileinfo.size++
+       }
+       newdn.inodes[newname] = oldinode
+       switch n := oldinode.(type) {
+       case *dirnode:
+               n.parent = newdn
+       case *filenode:
+               n.parent = newdn
+       default:
+               panic(fmt.Sprintf("bad inode type %T", n))
+       }
+       delete(olddn.inodes, oldname)
+       olddn.fileinfo.size--
+       return nil
+}
+
+func (dn *dirnode) Parent() inode {
+       dn.RLock()
+       defer dn.RUnlock()
+       return dn.parent
+}
+
+func (dn *dirnode) Readdir() (fi []os.FileInfo) {
+       dn.RLock()
+       defer dn.RUnlock()
+       fi = make([]os.FileInfo, 0, len(dn.inodes))
+       for _, inode := range dn.inodes {
+               fi = append(fi, inode.Stat())
+       }
+       return
+}
 
-       // Return a directory if it's the root dir or there are file
-       // entries below it.
-       children := map[string]collectionDirent{}
-       for fnm, size := range c.fileSizes() {
-               if !strings.HasPrefix(fnm, name+"/") {
+func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+       return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+       return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Size() int64 {
+       dn.RLock()
+       defer dn.RUnlock()
+       return dn.fileinfo.Size()
+}
+
+func (dn *dirnode) Stat() os.FileInfo {
+       dn.RLock()
+       defer dn.RUnlock()
+       return dn.fileinfo
+}
+
+func (dn *dirnode) Truncate(int64) error {
+       return ErrInvalidOperation
+}
+
+// lookupPath returns the inode for the file/directory with the given
+// name (which may contain "/" separators), along with its parent
+// node. If no such file/directory exists, the returned node is nil.
+func (dn *dirnode) lookupPath(path string) (node inode) {
+       node = dn
+       for _, name := range strings.Split(path, "/") {
+               dn, ok := node.(*dirnode)
+               if !ok {
+                       return nil
+               }
+               if name == "." || name == "" {
                        continue
                }
-               isDir := false
-               ent := fnm[len(name)+1:]
-               if i := strings.Index(ent, "/"); i >= 0 {
-                       ent = ent[:i]
-                       isDir = true
+               if name == ".." {
+                       node = node.Parent()
+                       continue
                }
-               e := children[ent]
-               e.collection = c.collection
-               e.isDir = isDir
-               e.name = ent
-               e.size = size
-               children[ent] = e
+               dn.RLock()
+               node = dn.inodes[name]
+               dn.RUnlock()
+       }
+       return
+}
+
+func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode {
+       child := &dirnode{
+               parent: dn,
+               client: dn.client,
+               kc:     dn.kc,
+               fileinfo: fileinfo{
+                       name:    name,
+                       mode:    os.ModeDir | perm,
+                       modTime: modTime,
+               },
+       }
+       if dn.inodes == nil {
+               dn.inodes = make(map[string]inode)
+       }
+       dn.inodes[name] = child
+       dn.fileinfo.size++
+       return child
+}
+
+func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode {
+       child := &filenode{
+               parent: dn,
+               fileinfo: fileinfo{
+                       name:    name,
+                       mode:    perm,
+                       modTime: modTime,
+               },
+       }
+       if dn.inodes == nil {
+               dn.inodes = make(map[string]inode)
        }
-       if len(children) == 0 && name != "." {
+       dn.inodes[name] = child
+       dn.fileinfo.size++
+       return child
+}
+
+// OpenFile is analogous to os.OpenFile().
+func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
+       if flag&os.O_SYNC != 0 {
+               return nil, ErrSyncNotSupported
+       }
+       dirname, name := path.Split(name)
+       dn, ok := dn.lookupPath(dirname).(*dirnode)
+       if !ok {
                return nil, os.ErrNotExist
        }
-       dirents := make([]os.FileInfo, 0, len(children))
-       for _, ent := range children {
-               dirents = append(dirents, ent)
+       var readable, writable bool
+       switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
+       case os.O_RDWR:
+               readable = true
+               writable = true
+       case os.O_RDONLY:
+               readable = true
+       case os.O_WRONLY:
+               writable = true
+       default:
+               return nil, fmt.Errorf("invalid flags 0x%x", flag)
+       }
+       if !writable {
+               // A directory can be opened via "foo/", "foo/.", or
+               // "foo/..".
+               switch name {
+               case ".", "":
+                       return &filehandle{inode: dn}, nil
+               case "..":
+                       return &filehandle{inode: dn.Parent()}, nil
+               }
+       }
+       createMode := flag&os.O_CREATE != 0
+       if createMode {
+               dn.Lock()
+               defer dn.Unlock()
+       } else {
+               dn.RLock()
+               defer dn.RUnlock()
        }
-       return &collectionDir{
-               collection: c.collection,
-               stream:     name,
-               dirents:    dirents,
+       n, ok := dn.inodes[name]
+       if !ok {
+               if !createMode {
+                       return nil, os.ErrNotExist
+               }
+               if perm.IsDir() {
+                       n = dn.newDirnode(name, 0755, time.Now())
+               } else {
+                       n = dn.newFilenode(name, 0755, time.Now())
+               }
+       } else if flag&os.O_EXCL != 0 {
+               return nil, ErrFileExists
+       } else if flag&os.O_TRUNC != 0 {
+               if !writable {
+                       return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
+               } else if fn, ok := n.(*filenode); !ok {
+                       return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
+               } else {
+                       fn.Truncate(0)
+               }
+       }
+       return &filehandle{
+               inode:    n,
+               append:   flag&os.O_APPEND != 0,
+               readable: readable,
+               writable: writable,
        }, nil
 }
 
-// fileSizes returns a map of files that can be opened. Each key
-// starts with "./".
-func (c *collectionFS) fileSizes() map[string]int64 {
-       c.sizesOnce.Do(func() {
-               c.sizes = map[string]int64{}
-               m := manifest.Manifest{Text: c.collection.ManifestText}
-               for ms := range m.StreamIter() {
-                       for _, fss := range ms.FileStreamSegments {
-                               c.sizes[ms.StreamName+"/"+fss.Name] += int64(fss.SegLen)
-                       }
+type segment interface {
+       io.ReaderAt
+       Len() int
+       // Return a new segment with a subsection of the data from this
+       // one. length<0 means length=Len()-off.
+       Slice(off int, length int) segment
+}
+
+type memSegment struct {
+       buf []byte
+}
+
+func (me *memSegment) Len() int {
+       return len(me.buf)
+}
+
+func (me *memSegment) Slice(off, length int) segment {
+       if length < 0 {
+               length = len(me.buf) - off
+       }
+       buf := make([]byte, length)
+       copy(buf, me.buf[off:])
+       return &memSegment{buf: buf}
+}
+
+func (me *memSegment) Truncate(n int) {
+       if n > cap(me.buf) {
+               newsize := 1024
+               for newsize < n {
+                       newsize = newsize << 2
+               }
+               newbuf := make([]byte, n, newsize)
+               copy(newbuf, me.buf)
+               me.buf = newbuf
+       } else {
+               // Zero unused part when shrinking, in case we grow
+               // and start using it again later.
+               for i := n; i < len(me.buf); i++ {
+                       me.buf[i] = 0
+               }
+       }
+       me.buf = me.buf[:n]
+}
+
+func (me *memSegment) WriteAt(p []byte, off int) {
+       if off+len(p) > len(me.buf) {
+               panic("overflowed segment")
+       }
+       copy(me.buf[off:], p)
+}
+
+func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
+       if off > int64(me.Len()) {
+               err = io.EOF
+               return
+       }
+       n = copy(p, me.buf[int(off):])
+       if n < len(p) {
+               err = io.EOF
+       }
+       return
+}
+
+type storedSegment struct {
+       kc      keepClient
+       locator string
+       size    int // size of stored block (also encoded in locator)
+       offset  int // position of segment within the stored block
+       length  int // bytes in this segment (offset + length <= size)
+}
+
+func (se storedSegment) Len() int {
+       return se.length
+}
+
+func (se storedSegment) Slice(n, size int) segment {
+       se.offset += n
+       se.length -= n
+       if size >= 0 && se.length > size {
+               se.length = size
+       }
+       return se
+}
+
+func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
+       if off > int64(se.length) {
+               return 0, io.EOF
+       }
+       maxlen := se.length - int(off)
+       if len(p) > maxlen {
+               p = p[:maxlen]
+               n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
+               if err == nil {
+                       err = io.EOF
                }
-       })
-       return c.sizes
+               return
+       }
+       return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
 func canonicalName(name string) string {
@@ -278,3 +1388,31 @@ func canonicalName(name string) string {
        }
        return name
 }
+
+var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
+
+func manifestUnescapeFunc(seq string) string {
+       if seq == `\\` {
+               return `\`
+       }
+       i, err := strconv.ParseUint(seq[1:], 8, 8)
+       if err != nil {
+               // Invalid escape sequence: can't unescape.
+               return seq
+       }
+       return string([]byte{byte(i)})
+}
+
+func manifestUnescape(s string) string {
+       return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
+}
+
+var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
+
+func manifestEscapeFunc(seq string) string {
+       return fmt.Sprintf("\\%03o", byte(seq[0]))
+}
+
+func manifestEscape(s string) string {
+       return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
+}
index f51d1eb3dd1d58d80b278f3859676addb9b0cab5..f1a34754f732bd9d94a2589e72aef164424827b1 100644 (file)
@@ -5,10 +5,20 @@
 package arvados
 
 import (
+       "bytes"
+       "crypto/md5"
+       "errors"
+       "fmt"
        "io"
+       "io/ioutil"
+       "math/rand"
        "net/http"
        "os"
+       "regexp"
+       "runtime"
+       "sync"
        "testing"
+       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
@@ -16,17 +26,55 @@ import (
 
 var _ = check.Suite(&CollectionFSSuite{})
 
+type keepClientStub struct {
+       blocks map[string][]byte
+       sync.RWMutex
+}
+
+var errStub404 = errors.New("404 block not found")
+
+func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+       kcs.RLock()
+       defer kcs.RUnlock()
+       buf := kcs.blocks[locator[:32]]
+       if buf == nil {
+               return 0, errStub404
+       }
+       return copy(p, buf[off:]), nil
+}
+
+func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
+       locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+       buf := make([]byte, len(p))
+       copy(buf, p)
+       kcs.Lock()
+       defer kcs.Unlock()
+       kcs.blocks[locator[:32]] = buf
+       return locator, 1, nil
+}
+
 type CollectionFSSuite struct {
        client *Client
        coll   Collection
-       fs     http.FileSystem
+       fs     CollectionFileSystem
+       kc     keepClient
 }
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
        s.client = NewClientFromEnv()
        err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
        c.Assert(err, check.IsNil)
-       s.fs = s.coll.FileSystem(s.client, nil)
+       s.kc = &keepClientStub{
+               blocks: map[string][]byte{
+                       "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
+               }}
+       s.fs, err = s.coll.FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
+       _, ok := s.fs.(http.FileSystem)
+       c.Check(ok, check.Equals, true)
 }
 
 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
@@ -58,7 +106,7 @@ func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
        }
 
        fis, err = f.Readdir(1)
-       c.Check(err, check.Equals, io.EOF)
+       c.Check(err, check.IsNil)
        c.Check(len(fis), check.Equals, 1)
        if len(fis) > 0 {
                c.Check(fis[0].Size(), check.Equals, int64(3))
@@ -76,7 +124,7 @@ func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
        c.Assert(err, check.IsNil)
        fis, err = f.Readdir(2)
        c.Check(len(fis), check.Equals, 1)
-       c.Assert(err, check.Equals, io.EOF)
+       c.Assert(err, check.IsNil)
        fis, err = f.Readdir(2)
        c.Check(len(fis), check.Equals, 0)
        c.Assert(err, check.Equals, io.EOF)
@@ -113,14 +161,856 @@ func (s *CollectionFSSuite) TestNotExist(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestOpenFile(c *check.C) {
-       c.Skip("cannot test files with nil keepclient")
+func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
+       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
+       c.Assert(err, check.IsNil)
+       st, err := f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(st.Size(), check.Equals, int64(3))
+       n, err := f.Write([]byte("bar"))
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, ErrReadOnlyFile)
+}
+
+func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
+       f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
+       c.Assert(err, check.IsNil)
+       st, err := f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(st.Size(), check.Equals, int64(0))
+
+       n, err := f.Write([]byte("bar"))
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+
+       c.Check(f.Close(), check.IsNil)
+
+       f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+       c.Check(f, check.IsNil)
+       c.Assert(err, check.NotNil)
+
+       f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       st, err = f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(st.Size(), check.Equals, int64(3))
+
+       c.Check(f.Close(), check.IsNil)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
+}
+
+func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
 
-       f, err := s.fs.Open("/foo.txt")
+       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
        c.Assert(err, check.IsNil)
+       defer f.Close()
        st, err := f.Stat()
        c.Assert(err, check.IsNil)
        c.Check(st.Size(), check.Equals, int64(3))
+
+       f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f2.Close()
+
+       buf := make([]byte, 64)
+       n, err := f.Read(buf)
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.Equals, io.EOF)
+       c.Check(string(buf[:3]), check.DeepEquals, "foo")
+
+       pos, err := f.Seek(-2, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(1))
+       c.Check(err, check.IsNil)
+
+       // Split a storedExtent in two, and insert a memExtent
+       n, err = f.Write([]byte("*"))
+       c.Check(n, check.Equals, 1)
+       c.Check(err, check.IsNil)
+
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(2))
+       c.Check(err, check.IsNil)
+
+       pos, err = f.Seek(0, io.SeekStart)
+       c.Check(pos, check.Equals, int64(0))
+       c.Check(err, check.IsNil)
+
+       rbuf, err := ioutil.ReadAll(f)
+       c.Check(len(rbuf), check.Equals, 3)
+       c.Check(err, check.IsNil)
+       c.Check(string(rbuf), check.Equals, "f*o")
+
+       // Write multiple blocks in one call
+       f.Seek(1, io.SeekStart)
+       n, err = f.Write([]byte("0123456789abcdefg"))
+       c.Check(n, check.Equals, 17)
+       c.Check(err, check.IsNil)
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(18))
+       pos, err = f.Seek(-18, io.SeekCurrent)
+       c.Check(err, check.IsNil)
+       n, err = io.ReadFull(f, buf)
+       c.Check(n, check.Equals, 18)
+       c.Check(err, check.Equals, io.ErrUnexpectedEOF)
+       c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
+
+       buf2, err := ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+       // truncate to current size
+       err = f.Truncate(18)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+       // shrink to zero some data
+       f.Truncate(15)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcd")
+
+       // grow to partial block/extent
+       f.Truncate(20)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
+
+       f.Truncate(0)
+       f2.Seek(0, io.SeekStart)
+       f2.Write([]byte("12345678abcdefghijkl"))
+
+       // grow to block/extent boundary
+       f.Truncate(64)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(len(buf2), check.Equals, 64)
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
+
+       // shrink to block/extent boundary
+       err = f.Truncate(32)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(len(buf2), check.Equals, 32)
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
+
+       // shrink to partial block/extent
+       err = f.Truncate(15)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "12345678abcdefg")
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
+
+       // Force flush to ensure the block "12345678" gets stored, so
+       // we know what to expect in the final manifest below.
+       _, err = s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+
+       // Truncate to size=3 while f2's ptr is at 15
+       err = f.Truncate(3)
+       c.Check(err, check.IsNil)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "")
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "123")
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+
+       checkSize := func(size int64) {
+               fi, err := f.Stat()
+               c.Check(fi.Size(), check.Equals, size)
+
+               f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+               c.Assert(err, check.IsNil)
+               defer f.Close()
+               fi, err = f.Stat()
+               c.Check(fi.Size(), check.Equals, size)
+               pos, err := f.Seek(0, io.SeekEnd)
+               c.Check(pos, check.Equals, size)
+       }
+
+       f.Seek(2, io.SeekEnd)
+       checkSize(0)
+       f.Write([]byte{1})
+       checkSize(3)
+
+       f.Seek(2, io.SeekCurrent)
+       checkSize(3)
+       f.Write([]byte{})
+       checkSize(5)
+
+       f.Seek(8, io.SeekStart)
+       checkSize(5)
+       n, err := f.Read(make([]byte, 1))
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+       checkSize(5)
+       f.Write([]byte{1, 2, 3})
+       checkSize(11)
+}
+
+func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       for _, name := range []string{"foo", "bar", "baz"} {
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               f.Write([]byte(name))
+               f.Close()
+       }
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestMkdir(c *check.C) {
+       err := s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       err = s.fs.Mkdir("foo", 0755)
+       c.Check(err, check.IsNil)
+
+       f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
+       c.Check(err, check.IsNil)
+       if err == nil {
+               defer f.Close()
+               f.Write([]byte("foo"))
+       }
+
+       // mkdir fails if a file already exists with that name
+       err = s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.NotNil)
+
+       err = s.fs.Remove("foo/bar")
+       c.Check(err, check.IsNil)
+
+       // mkdir succeds after the file is deleted
+       err = s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.IsNil)
+
+       // creating a file in a nonexistent subdir should still fail
+       f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+       c.Check(err, check.IsNil)
+       if err == nil {
+               defer f.Close()
+               f.Write([]byte("foo"))
+       }
+
+       // creating foo/bar as a regular file should fail
+       f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
+       c.Check(err, check.NotNil)
+
+       // creating foo/bar as a directory should fail
+       f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
+       c.Check(err, check.NotNil)
+       err = s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.NotNil)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
+}
+
+func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var wg sync.WaitGroup
+       for n := 0; n < 128; n++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       for i := 0; i < 6502; i++ {
+                               switch rand.Int() & 3 {
+                               case 0:
+                                       f.Truncate(int64(rand.Intn(64)))
+                               case 1:
+                                       f.Seek(int64(rand.Intn(64)), io.SeekStart)
+                               case 2:
+                                       _, err := f.Write([]byte("beep boop"))
+                                       c.Check(err, check.IsNil)
+                               case 3:
+                                       _, err := ioutil.ReadAll(f)
+                                       c.Check(err, check.IsNil)
+                               }
+                       }
+               }()
+       }
+       wg.Wait()
+
+       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       buf, err := ioutil.ReadAll(f)
+       c.Check(err, check.IsNil)
+       c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
+}
+
+func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
+       maxBlockSize = 40
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       const nfiles = 256
+       const ngoroutines = 256
+
+       var wg sync.WaitGroup
+       for n := 0; n < nfiles; n++ {
+               wg.Add(1)
+               go func(n int) {
+                       defer wg.Done()
+                       expect := make([]byte, 0, 64)
+                       wbytes := []byte("there's no simple explanation for anything important that any of us do")
+                       f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       for i := 0; i < ngoroutines; i++ {
+                               trunc := rand.Intn(65)
+                               woff := rand.Intn(trunc + 1)
+                               wbytes = wbytes[:rand.Intn(64-woff+1)]
+                               for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
+                                       buf[i] = 0
+                               }
+                               expect = expect[:trunc]
+                               if trunc < woff+len(wbytes) {
+                                       expect = expect[:woff+len(wbytes)]
+                               }
+                               copy(expect[woff:], wbytes)
+                               f.Truncate(int64(trunc))
+                               pos, err := f.Seek(int64(woff), io.SeekStart)
+                               c.Check(pos, check.Equals, int64(woff))
+                               c.Check(err, check.IsNil)
+                               n, err := f.Write(wbytes)
+                               c.Check(n, check.Equals, len(wbytes))
+                               c.Check(err, check.IsNil)
+                               pos, err = f.Seek(0, io.SeekStart)
+                               c.Check(pos, check.Equals, int64(0))
+                               c.Check(err, check.IsNil)
+                               buf, err := ioutil.ReadAll(f)
+                               c.Check(string(buf), check.Equals, string(expect))
+                               c.Check(err, check.IsNil)
+                       }
+                       s.checkMemSize(c, f)
+               }(n)
+       }
+       wg.Wait()
+
+       root, err := s.fs.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err := root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, nfiles)
+
+       _, err = s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       // TODO: check manifest content
+}
+
+func (s *CollectionFSSuite) TestRemove(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir0", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir1", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir1/dir2", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir1/dir3", 0755)
+       c.Assert(err, check.IsNil)
+
+       err = fs.Remove("dir0")
+       c.Check(err, check.IsNil)
+       err = fs.Remove("dir0")
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       err = fs.Remove("dir1/dir2/.")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Remove("dir1/dir2/..")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Remove("dir1")
+       c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+       err = fs.Remove("dir1/dir2/../../../dir1")
+       c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+       err = fs.Remove("dir1/dir3/")
+       c.Check(err, check.IsNil)
+       err = fs.RemoveAll("dir1")
+       c.Check(err, check.IsNil)
+       err = fs.RemoveAll("dir1")
+       c.Check(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestRename(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       const (
+               outer = 16
+               inner = 16
+       )
+       for i := 0; i < outer; i++ {
+               err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
+               c.Assert(err, check.IsNil)
+               for j := 0; j < inner; j++ {
+                       err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
+                       c.Assert(err, check.IsNil)
+                       for _, fnm := range []string{
+                               fmt.Sprintf("dir%d/file%d", i, j),
+                               fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+                       } {
+                               f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
+                               c.Assert(err, check.IsNil)
+                               _, err = f.Write([]byte("beep"))
+                               c.Assert(err, check.IsNil)
+                               f.Close()
+                       }
+               }
+       }
+       var wg sync.WaitGroup
+       for i := 0; i < outer; i++ {
+               for j := 0; j < inner; j++ {
+                       wg.Add(1)
+                       go func(i, j int) {
+                               defer wg.Done()
+                               oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
+                               newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
+                               _, err := fs.Open(newname)
+                               c.Check(err, check.Equals, os.ErrNotExist)
+                               err = fs.Rename(oldname, newname)
+                               c.Check(err, check.IsNil)
+                               f, err := fs.Open(newname)
+                               c.Check(err, check.IsNil)
+                               f.Close()
+                       }(i, j)
+
+                       wg.Add(1)
+                       go func(i, j int) {
+                               defer wg.Done()
+                               // oldname does not exist
+                               err := fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d/missing", i, j),
+                                       fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // newname parent dir does not exist
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d", i, j),
+                                       fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // oldname parent dir is a file
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/file%d/patherror", i, j),
+                                       fmt.Sprintf("dir%d/irrelevant", i))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // newname parent dir is a file
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+                                       fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+                       }(i, j)
+               }
+       }
+       wg.Wait()
+
+       f, err := fs.OpenFile("dir1/newfile3", 0, 0)
+       c.Assert(err, check.IsNil)
+       c.Check(f.Size(), check.Equals, int64(4))
+       buf, err := ioutil.ReadAll(f)
+       c.Check(buf, check.DeepEquals, []byte("beep"))
+       c.Check(err, check.IsNil)
+       _, err = fs.Open("dir1/dir1/file1")
+       c.Check(err, check.Equals, os.ErrNotExist)
+}
+
+func (s *CollectionFSSuite) TestPersist(c *check.C) {
+       maxBlockSize = 1024
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = s.fs.Mkdir("d:r", 0755)
+       c.Assert(err, check.IsNil)
+
+       expect := map[string][]byte{}
+
+       var wg sync.WaitGroup
+       for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
+               buf := make([]byte, 500)
+               rand.Read(buf)
+               expect[name] = buf
+
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               // Note: we don't close the file until after the test
+               // is done. Writes to unclosed files should persist.
+               defer f.Close()
+
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for i := 0; i < len(buf); i += 5 {
+                               _, err := f.Write(buf[i : i+5])
+                               c.Assert(err, check.IsNil)
+                       }
+               }()
+       }
+       wg.Wait()
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       c.Logf("%q", m)
+
+       root, err := s.fs.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err := root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, 4)
+
+       persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       root, err = persisted.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err = root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, 4)
+
+       for name, content := range expect {
+               c.Logf("read %q", name)
+               f, err := persisted.Open(name)
+               c.Assert(err, check.IsNil)
+               defer f.Close()
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, check.IsNil)
+               c.Check(buf, check.DeepEquals, content)
+       }
+}
+
+func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+               err = s.fs.Mkdir(name, 0755)
+               c.Assert(err, check.IsNil)
+       }
+
+       expect := map[string][]byte{
+               "0":                nil,
+               "00":               []byte{},
+               "one":              []byte{1},
+               "dir/0":            nil,
+               "dir/two":          []byte{1, 2},
+               "dir/zero":         nil,
+               "dir/zerodir/zero": nil,
+               "zero/zero/zero":   nil,
+       }
+       for name, data := range expect {
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               if data != nil {
+                       _, err := f.Write(data)
+                       c.Assert(err, check.IsNil)
+               }
+               f.Close()
+       }
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       c.Logf("%q", m)
+
+       persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       for name, data := range expect {
+               f, err := persisted.Open("bogus-" + name)
+               c.Check(err, check.NotNil)
+
+               f, err = persisted.Open(name)
+               c.Assert(err, check.IsNil)
+
+               if data == nil {
+                       data = []byte{}
+               }
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, check.IsNil)
+               c.Check(buf, check.DeepEquals, data)
+       }
+}
+
+func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
+       c.Check(f, check.IsNil)
+       c.Check(err, check.ErrorMatches, `file does not exist`)
+
+       f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       n, err := f.Write([]byte{1, 2, 3})
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.ErrorMatches, `read-only file`)
+       n, err = f.Read(make([]byte, 1))
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+       f, err = fs.OpenFile("new", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       _, err = f.Write([]byte{4, 5, 6})
+       c.Check(err, check.IsNil)
+       fi, err := f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(fi.Size(), check.Equals, int64(3))
+
+       f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       pos, err := f.Seek(0, io.SeekEnd)
+       c.Check(pos, check.Equals, int64(0))
+       c.Check(err, check.IsNil)
+       fi, err = f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(fi.Size(), check.Equals, int64(0))
+       fs.Remove("new")
+
+       buf := make([]byte, 64)
+       f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
+       c.Assert(err, check.IsNil)
+       f.Write([]byte{1, 2, 3})
+       f.Seek(0, io.SeekStart)
+       n, _ = f.Read(buf[:1])
+       c.Check(n, check.Equals, 1)
+       c.Check(buf[:1], check.DeepEquals, []byte{1})
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(1))
+       f.Write([]byte{4, 5, 6})
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(6))
+       f.Seek(0, io.SeekStart)
+       n, err = f.Read(buf)
+       c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
+       c.Check(err, check.Equals, io.EOF)
+       f.Close()
+
+       f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
+       c.Assert(err, check.IsNil)
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(0))
+       c.Check(err, check.IsNil)
+       f.Read(buf[:3])
+       pos, _ = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(3))
+       f.Write([]byte{7, 8, 9})
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(9))
+       f.Close()
+
+       f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
+       c.Assert(err, check.IsNil)
+       n, err = f.Write([]byte{3, 2, 1})
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+       pos, _ = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(3))
+       pos, _ = f.Seek(0, io.SeekStart)
+       c.Check(pos, check.Equals, int64(0))
+       n, err = f.Read(buf)
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
+       f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
+       c.Assert(err, check.IsNil)
+       n, _ = f.Read(buf)
+       c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
+
+       f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
+       c.Check(f, check.IsNil)
+       c.Check(err, check.NotNil)
+
+       f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
+       c.Check(f, check.IsNil)
+       c.Check(err, check.ErrorMatches, `invalid flag.*`)
+}
+
+func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+       maxBlockSize = 1024
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+
+       data := make([]byte, 500)
+       rand.Read(data)
+
+       for i := 0; i < 100; i++ {
+               n, err := f.Write(data)
+               c.Assert(n, check.Equals, len(data))
+               c.Assert(err, check.IsNil)
+       }
+
+       currentMemExtents := func() (memExtents []int) {
+               for idx, e := range f.(*filehandle).inode.(*filenode).segments {
+                       switch e.(type) {
+                       case *memSegment:
+                               memExtents = append(memExtents, idx)
+                       }
+               }
+               return
+       }
+       c.Check(currentMemExtents(), check.HasLen, 1)
+
+       m, err := fs.MarshalManifest(".")
+       c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
+       c.Check(err, check.IsNil)
+       c.Check(currentMemExtents(), check.HasLen, 0)
+}
+
+func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
+       for _, txt := range []string{
+               "\n",
+               ".\n",
+               ". \n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
+               ". 0:0:foo\n",
+               ".  0:0:foo\n",
+               ". 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
+               "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
+       } {
+               c.Logf("<-%q", txt)
+               fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+               c.Check(fs, check.IsNil)
+               c.Logf("-> %s", err)
+               c.Check(err, check.NotNil)
+       }
+}
+
+func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
+       for _, txt := range []string{
+               "",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
+       } {
+               c.Logf("<-%q", txt)
+               fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+               c.Check(err, check.IsNil)
+               c.Check(fs, check.NotNil)
+       }
+}
+
+func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
+       fn := f.(*filehandle).inode.(*filenode)
+       var memsize int64
+       for _, seg := range fn.segments {
+               if e, ok := seg.(*memSegment); ok {
+                       memsize += int64(len(e.buf))
+               }
+       }
+       c.Check(fn.memsize, check.Equals, memsize)
+}
+
+type CollectionFSUnitSuite struct{}
+
+var _ = check.Suite(&CollectionFSUnitSuite{})
+
+// expect ~2 seconds to load a manifest with 256K files
+func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
+       const (
+               dirCount  = 512
+               fileCount = 512
+       )
+
+       mb := bytes.NewBuffer(make([]byte, 0, 40000000))
+       for i := 0; i < dirCount; i++ {
+               fmt.Fprintf(mb, "./dir%d", i)
+               for j := 0; j <= fileCount; j++ {
+                       fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
+               }
+               for j := 0; j < fileCount; j++ {
+                       fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
+               }
+               mb.Write([]byte{'\n'})
+       }
+       coll := Collection{ManifestText: mb.String()}
+       c.Logf("%s built", time.Now())
+
+       var memstats runtime.MemStats
+       runtime.ReadMemStats(&memstats)
+       c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
+
+       f, err := coll.FileSystem(nil, nil)
+       c.Check(err, check.IsNil)
+       c.Logf("%s loaded", time.Now())
+
+       for i := 0; i < dirCount; i++ {
+               for j := 0; j < fileCount; j++ {
+                       f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
+               }
+       }
+       c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
+
+       runtime.ReadMemStats(&memstats)
+       c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
 }
 
 // Gocheck boilerplate
index 773a2e6f9c7d787406511f85a6a5585596153738..9a04855784a76eee88bb4119b1483fa3505b67f2 100644 (file)
@@ -17,7 +17,7 @@ type TransactionError struct {
        URL        url.URL
        StatusCode int
        Status     string
-       errors     []string
+       Errors     []string
 }
 
 func (e TransactionError) Error() (s string) {
@@ -25,8 +25,8 @@ func (e TransactionError) Error() (s string) {
        if e.Status != "" {
                s = s + ": " + e.Status
        }
-       if len(e.errors) > 0 {
-               s = s + ": " + strings.Join(e.errors, "; ")
+       if len(e.Errors) > 0 {
+               s = s + ": " + strings.Join(e.Errors, "; ")
        }
        return
 }
@@ -35,7 +35,7 @@ func newTransactionError(req *http.Request, resp *http.Response, buf []byte) *Tr
        var e TransactionError
        if json.Unmarshal(buf, &e) != nil {
                // No JSON-formatted error response
-               e.errors = nil
+               e.Errors = nil
        }
        e.Method = req.Method
        e.URL = *req.URL
index 18fd91f560227ee1c40fec2bce64126bc0c3b827..d2c3a41f2108e2bc852f56119b747a7ec9423e7a 100644 (file)
@@ -5,6 +5,8 @@
 package httpserver
 
 import (
+       "math/rand"
+       "net/http"
        "strconv"
        "sync"
        "time"
@@ -17,19 +19,34 @@ type IDGenerator struct {
        // Prefix is prepended to each returned ID.
        Prefix string
 
-       lastID int64
-       mtx    sync.Mutex
+       mtx sync.Mutex
+       src rand.Source
 }
 
 // Next returns a new ID string. It is safe to call Next from multiple
 // goroutines.
 func (g *IDGenerator) Next() string {
-       id := time.Now().UnixNano()
        g.mtx.Lock()
-       if id <= g.lastID {
-               id = g.lastID + 1
+       defer g.mtx.Unlock()
+       if g.src == nil {
+               g.src = rand.NewSource(time.Now().UnixNano())
        }
-       g.lastID = id
-       g.mtx.Unlock()
-       return g.Prefix + strconv.FormatInt(id, 36)
+       a, b := g.src.Int63(), g.src.Int63()
+       id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+       for len(id) > 20 {
+               id = id[:20]
+       }
+       return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+       gen := &IDGenerator{Prefix: "req-"}
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Header.Get("X-Request-Id") == "" {
+                       req.Header.Set("X-Request-Id", gen.Next())
+               }
+               h.ServeHTTP(w, req)
+       })
 }
diff --git a/sdk/go/httpserver/logger.go b/sdk/go/httpserver/logger.go
new file mode 100644 (file)
index 0000000..decb2ff
--- /dev/null
@@ -0,0 +1,82 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "context"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+       log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+       name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+       return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+               w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+               req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+               lgr := log.WithFields(log.Fields{
+                       "RequestID":       req.Header.Get("X-Request-Id"),
+                       "remoteAddr":      req.RemoteAddr,
+                       "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+                       "reqMethod":       req.Method,
+                       "reqPath":         req.URL.Path[1:],
+                       "reqBytes":        req.ContentLength,
+               })
+               logRequest(w, req, lgr)
+               defer logResponse(w, req, lgr)
+               h.ServeHTTP(w, req)
+       })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+               tDone := time.Now()
+               lgr = lgr.WithFields(log.Fields{
+                       "timeTotal":     stats.Duration(tDone.Sub(tStart)),
+                       "timeToStatus":  stats.Duration(w.writeTime.Sub(tStart)),
+                       "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+               })
+       }
+       lgr.WithFields(log.Fields{
+               "respStatusCode": w.WroteStatus(),
+               "respStatus":     http.StatusText(w.WroteStatus()),
+               "respBytes":      w.WroteBodyBytes(),
+       }).Info("response")
+}
+
+type responseTimer struct {
+       ResponseWriter
+       wrote     bool
+       writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       return rt.ResponseWriter.Write(p)
+}
diff --git a/sdk/go/httpserver/logger_test.go b/sdk/go/httpserver/logger_test.go
new file mode 100644 (file)
index 0000000..bbcafa1
--- /dev/null
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "bytes"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "testing"
+       "time"
+
+       log "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+       defer log.SetOutput(os.Stdout)
+       captured := &bytes.Buffer{}
+       log.SetOutput(captured)
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: time.RFC3339Nano,
+       })
+       h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               w.Write([]byte("hello world"))
+       })
+       req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+       req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+       c.Assert(err, check.IsNil)
+       resp := httptest.NewRecorder()
+       AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+       dec := json.NewDecoder(captured)
+
+       gotReq := make(map[string]interface{})
+       err = dec.Decode(&gotReq)
+       c.Logf("%#v", gotReq)
+       c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+       c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotReq["msg"], check.Equals, "request")
+
+       gotResp := make(map[string]interface{})
+       err = dec.Decode(&gotResp)
+       c.Logf("%#v", gotResp)
+       c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+       c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotResp["msg"], check.Equals, "response")
+
+       c.Assert(gotResp["time"], check.FitsTypeOf, "")
+       _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+       c.Check(err, check.IsNil)
+
+       for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+               c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+               c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+       }
+}
index e6ac4ca7a8be42d996e32c7fbc647389cc5248a4..f17bc820a98f5c242ffc24122d1176cc5c21a57e 100644 (file)
@@ -8,40 +8,46 @@ import (
        "net/http"
 )
 
-// ResponseWriter wraps http.ResponseWriter and exposes the status
+type ResponseWriter interface {
+       http.ResponseWriter
+       WroteStatus() int
+       WroteBodyBytes() int
+}
+
+// responseWriter wraps http.ResponseWriter and exposes the status
 // sent, the number of bytes sent to the client, and the last write
 // error.
-type ResponseWriter struct {
+type responseWriter struct {
        http.ResponseWriter
-       wroteStatus    *int   // Last status given to WriteHeader()
-       wroteBodyBytes *int   // Bytes successfully written
-       err            *error // Last error returned from Write()
+       wroteStatus    int   // Last status given to WriteHeader()
+       wroteBodyBytes int   // Bytes successfully written
+       err            error // Last error returned from Write()
 }
 
 func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter {
-       return ResponseWriter{orig, new(int), new(int), new(error)}
+       return &responseWriter{ResponseWriter: orig}
 }
 
-func (w ResponseWriter) WriteHeader(s int) {
-       *w.wroteStatus = s
+func (w *responseWriter) WriteHeader(s int) {
+       w.wroteStatus = s
        w.ResponseWriter.WriteHeader(s)
 }
 
-func (w ResponseWriter) Write(data []byte) (n int, err error) {
+func (w *responseWriter) Write(data []byte) (n int, err error) {
        n, err = w.ResponseWriter.Write(data)
-       *w.wroteBodyBytes += n
-       *w.err = err
+       w.wroteBodyBytes += n
+       w.err = err
        return
 }
 
-func (w ResponseWriter) WroteStatus() int {
-       return *w.wroteStatus
+func (w *responseWriter) WroteStatus() int {
+       return w.wroteStatus
 }
 
-func (w ResponseWriter) WroteBodyBytes() int {
-       return *w.wroteBodyBytes
+func (w *responseWriter) WroteBodyBytes() int {
+       return w.wroteBodyBytes
 }
 
-func (w ResponseWriter) Err() error {
-       return *w.err
+func (w *responseWriter) Err() error {
+       return w.err
 }
index e841a00fa1e1f3493bff67890b2aee70181a3bca..bac4a24fd5a037d9cdcafb663612677304184c97 100644 (file)
@@ -7,6 +7,8 @@ package keepclient
 import (
        "io"
        "sort"
+       "strconv"
+       "strings"
        "sync"
        "time"
 )
@@ -49,10 +51,30 @@ func (c *BlockCache) Sweep() {
        }
 }
 
+// ReadAt returns data from the cache, first retrieving it from Keep if
+// necessary.
+func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
+       buf, err := c.Get(kc, locator)
+       if err != nil {
+               return 0, err
+       }
+       if off > len(buf) {
+               return 0, io.ErrUnexpectedEOF
+       }
+       return copy(p, buf[off:]), nil
+}
+
 // Get returns data from the cache, first retrieving it from Keep if
 // necessary.
 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
        cacheKey := locator[:32]
+       bufsize := BLOCKSIZE
+       if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
+               datasize, err := strconv.ParseInt(parts[1], 10, 32)
+               if err == nil && datasize >= 0 {
+                       bufsize = int(datasize)
+               }
+       }
        c.mtx.Lock()
        if c.cache == nil {
                c.cache = make(map[string]*cacheBlock)
@@ -68,7 +90,7 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
                        rdr, size, _, err := kc.Get(locator)
                        var data []byte
                        if err == nil {
-                               data = make([]byte, size, BLOCKSIZE)
+                               data = make([]byte, size, bufsize)
                                _, err = io.ReadFull(rdr, data)
                                err2 := rdr.Close()
                                if err == nil {
index 57829aadebb0f3c4aff32c604db08e6e481a1a3b..fa309f65535c270ace4b4d0e2219092b7e49b5e1 100644 (file)
@@ -6,25 +6,12 @@ package keepclient
 
 import (
        "errors"
-       "fmt"
-       "io"
        "os"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
-const (
-       // After reading a data block from Keep, cfReader slices it up
-       // and sends the slices to a buffered channel to be consumed
-       // by the caller via Read().
-       //
-       // dataSliceSize is the maximum size of the slices, and
-       // therefore the maximum number of bytes that will be returned
-       // by a single call to Read().
-       dataSliceSize = 1 << 20
-)
-
 // ErrNoManifest indicates the given collection has no manifest
 // information (e.g., manifest_text was excluded by a "select"
 // parameter when retrieving the collection record).
@@ -38,141 +25,17 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
        if !ok {
                return nil, ErrNoManifest
        }
-       m := manifest.Manifest{Text: mText}
-       return kc.ManifestFileReader(m, filename)
-}
-
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
-       f := &file{
-               kc: kc,
-       }
-       err := f.load(m, filename)
+       fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc)
        if err != nil {
                return nil, err
        }
-       return f, nil
-}
-
-type file struct {
-       kc       *KeepClient
-       segments []*manifest.FileSegment
-       size     int64 // total file size
-       offset   int64 // current read offset
-
-       // current/latest segment accessed -- might or might not match pos
-       seg           *manifest.FileSegment
-       segStart      int64 // position of segment relative to file
-       segData       []byte
-       segNext       []*manifest.FileSegment
-       readaheadDone bool
-}
-
-// Close implements io.Closer.
-func (f *file) Close() error {
-       f.kc = nil
-       f.segments = nil
-       f.segData = nil
-       return nil
+       return fs.OpenFile(filename, os.O_RDONLY, 0)
 }
 
-// Read implements io.Reader.
-func (f *file) Read(buf []byte) (int, error) {
-       if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
-               // f.seg does not cover the current read offset
-               // (f.pos).  Iterate over f.segments to find the one
-               // that does.
-               f.seg = nil
-               f.segStart = 0
-               f.segData = nil
-               f.segNext = f.segments
-               for len(f.segNext) > 0 {
-                       seg := f.segNext[0]
-                       f.segNext = f.segNext[1:]
-                       segEnd := f.segStart + int64(seg.Len)
-                       if segEnd > f.offset {
-                               f.seg = seg
-                               break
-                       }
-                       f.segStart = segEnd
-               }
-               f.readaheadDone = false
-       }
-       if f.seg == nil {
-               return 0, io.EOF
-       }
-       if f.segData == nil {
-               data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
-               if err != nil {
-                       return 0, err
-               }
-               if len(data) < f.seg.Offset+f.seg.Len {
-                       return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
-               }
-               f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
-       }
-       // dataOff and dataLen denote a portion of f.segData
-       // corresponding to a portion of the file at f.offset.
-       dataOff := int(f.offset - f.segStart)
-       dataLen := f.seg.Len - dataOff
-
-       if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
-               // If we have already read more than just the first
-               // few bytes of this file, and we have already
-               // consumed a noticeable portion of this segment, and
-               // there's more data for this file in the next segment
-               // ... then there's a good chance we are going to need
-               // the data for that next segment soon. Start getting
-               // it into the cache now.
-               go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
-               f.readaheadDone = true
-       }
-
-       n := len(buf)
-       if n > dataLen {
-               n = dataLen
-       }
-       copy(buf[:n], f.segData[dataOff:dataOff+n])
-       f.offset += int64(n)
-       return n, nil
-}
-
-// Seek implements io.Seeker.
-func (f *file) Seek(offset int64, whence int) (int64, error) {
-       var want int64
-       switch whence {
-       case io.SeekStart:
-               want = offset
-       case io.SeekCurrent:
-               want = f.offset + offset
-       case io.SeekEnd:
-               want = f.size + offset
-       default:
-               return f.offset, fmt.Errorf("invalid whence %d", whence)
-       }
-       if want < 0 {
-               return f.offset, fmt.Errorf("attempted seek to %d", want)
-       }
-       if want > f.size {
-               want = f.size
-       }
-       f.offset = want
-       return f.offset, nil
-}
-
-// Size returns the file size in bytes.
-func (f *file) Size() int64 {
-       return f.size
-}
-
-func (f *file) load(m manifest.Manifest, path string) error {
-       f.segments = nil
-       f.size = 0
-       for seg := range m.FileSegmentIterByName(path) {
-               f.segments = append(f.segments, seg)
-               f.size += int64(seg.Len)
-       }
-       if f.segments == nil {
-               return os.ErrNotExist
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+       fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc)
+       if err != nil {
+               return nil, err
        }
-       return nil
+       return fs.OpenFile(filename, os.O_RDONLY, 0)
 }
index df8bcb39dce2eaedf0a03c411bc4043ef8f0fe29..5d1e2a15332f31eaf5bd4f03bdd1343d746ada4f 100644 (file)
@@ -166,11 +166,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
 
 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
        h := md5.New()
-       var testdata []byte
        buf := make([]byte, 4096)
        locs := make([]string, len(buf))
+       testdata := make([]byte, 0, len(buf)*len(buf))
        filesize := 0
-       for i := 0; i < len(locs); i++ {
+       for i := range locs {
                _, err := rand.Read(buf[:i])
                h.Write(buf[:i])
                locs[i], _, err = s.kc.PutB(buf[:i])
@@ -219,11 +219,12 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
        c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
        c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
 
+       expectPos := curPos + size + 12345
        curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
        c.Check(err, check.IsNil)
-       c.Check(curPos, check.Equals, size)
+       c.Check(curPos, check.Equals, expectPos)
 
-       curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+       curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
        c.Check(err, check.IsNil)
        c.Check(curPos, check.Equals, int64(8))
 
index 37d651e31fbd971defa3217d6c11e883c4a073cc..54a4a374b991b44c5a5e51878be980a1b78f9609 100644 (file)
@@ -293,6 +293,12 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
        return kc.getOrHead("GET", locator)
 }
 
+// ReadAt() retrieves a portion of block from the cache if it's
+// present, otherwise from the network.
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
+       return kc.cache().ReadAt(kc, locator, p, off)
+}
+
 // Ask() verifies that a block with the given hash is available and
 // readable, according to at least one Keep service. Unlike Get, it
 // does not retrieve the data or verify that the data content matches
index b7373b5c1ec5d1c5d5c94b3c135825f7a43a24ad..617c73282f633ac6ddbca83c2094c1acfe8f3f18 100644 (file)
@@ -188,5 +188,5 @@ func (h *authHandler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        r.URL.Path = rewrittenPath
 
-       h.handler.ServeHTTP(&w, r)
+       h.handler.ServeHTTP(w, r)
 }
index 4702838362c04b1237593f66a789ea96848bb3c3..97faa89fb134ae8cb12a1d36ce2ef458da9af020 100644 (file)
@@ -328,11 +328,27 @@ type FileWrapper struct {
        len int64
 }
 
+func (fw FileWrapper) Readdir(n int) ([]os.FileInfo, error) {
+       return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Seek(int64, int) (int64, error) {
+       return 0, errors.New("not implemented")
+}
+
 func (fw FileWrapper) Size() int64 {
        return fw.len
 }
 
-func (fw FileWrapper) Seek(int64, int) (int64, error) {
+func (fw FileWrapper) Stat() (os.FileInfo, error) {
+       return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Truncate(int64) error {
+       return errors.New("not implemented")
+}
+
+func (fw FileWrapper) Write([]byte) (int, error) {
        return 0, errors.New("not implemented")
 }
 
@@ -440,20 +456,14 @@ func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, erro
 func (KeepReadErrorTestClient) ClearBlockCache() {
 }
 
-type ErrorReader struct{}
+type ErrorReader struct {
+       FileWrapper
+}
 
 func (ErrorReader) Read(p []byte) (n int, err error) {
        return 0, errors.New("ErrorReader")
 }
 
-func (ErrorReader) Close() error {
-       return nil
-}
-
-func (ErrorReader) Size() int64 {
-       return 0
-}
-
 func (ErrorReader) Seek(int64, int) (int64, error) {
        return 0, errors.New("ErrorReader")
 }
index ce1168acd2c1d07bcd6e8623c5421bcbe905f04c..9ee99903c8d1e537d487a67d1c77d848fc93c807 100644 (file)
@@ -86,6 +86,29 @@ func (c *cache) Stats() cacheStats {
        }
 }
 
+// Update saves a modified version (fs) to an existing collection
+// (coll) and, if successful, updates the relevant cache entries so
+// subsequent calls to Get() reflect the modifications.
+func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
+       c.setupOnce.Do(c.setup)
+
+       if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
+               return err
+       } else {
+               coll.ManifestText = m
+       }
+       var updated arvados.Collection
+       defer c.pdhs.Remove(coll.UUID)
+       err := client.RequestAndDecode(&updated, "PATCH", "/arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
+       if err == nil {
+               c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
+                       expire:     time.Now().Add(time.Duration(c.TTL)),
+                       collection: &updated,
+               })
+       }
+       return err
+}
+
 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
        c.setupOnce.Do(c.setup)
 
index 87a712f04c4f2d381dfcd3dccb5cbdc93680eaf1..d4a89c844b567a4d2fb8d3a94f8138e365a672d7 100644 (file)
@@ -7,42 +7,202 @@ package main
 import (
        "bytes"
        "io"
+       "io/ioutil"
+       "net/url"
+       "os"
        "os/exec"
+       "strings"
+       "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
 )
 
 func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) {
-       basePath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
+       testdata := []byte("the human tragedy consists in the necessity of living with the consequences of actions performed under the pressure of compulsions we do not understand")
+
+       localfile, err := ioutil.TempFile("", "localfile")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(localfile.Name())
+       localfile.Write(testdata)
+
+       emptyfile, err := ioutil.TempFile("", "emptyfile")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(emptyfile.Name())
+
+       checkfile, err := ioutil.TempFile("", "checkfile")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(checkfile.Name())
+
+       var newCollection arvados.Collection
+       arv := arvados.NewClientFromEnv()
+       arv.AuthToken = arvadostest.ActiveToken
+       err = arv.RequestAndDecode(&newCollection, "POST", "/arvados/v1/collections", bytes.NewBufferString(url.Values{"collection": {"{}"}}.Encode()), nil)
+       c.Assert(err, check.IsNil)
+       writePath := "/c=" + newCollection.UUID + "/t=" + arv.AuthToken + "/"
+
+       pdhPath := "/c=" + strings.Replace(arvadostest.FooAndBarFilesInDirPDH, "+", "-", -1) + "/t=" + arv.AuthToken + "/"
+
+       matchToday := time.Now().Format("Jan +2")
+
+       readPath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
        type testcase struct {
                path  string
                cmd   string
                match string
+               data  []byte
        }
        for _, trial := range []testcase{
                {
-                       path:  basePath,
+                       path:  readPath,
                        cmd:   "ls\n",
                        match: `(?ms).*dir1 *0 .*`,
                },
                {
-                       path:  basePath,
+                       path:  readPath,
                        cmd:   "ls dir1\n",
                        match: `(?ms).*bar *3.*foo *3 .*`,
                },
                {
-                       path:  basePath + "_/dir1",
+                       path:  readPath + "_/dir1",
                        cmd:   "ls\n",
                        match: `(?ms).*bar *3.*foo *3 .*`,
                },
                {
-                       path:  basePath + "dir1/",
+                       path:  readPath + "dir1/",
                        cmd:   "ls\n",
-                       match: `(?ms).*bar *3.*foo *3 .*`,
+                       match: `(?ms).*bar *3.*foo +3 +Feb +\d+ +2014.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get emptyfile '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*Not Found.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + emptyfile.Name() + "' emptyfile\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get emptyfile '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*Downloading .* succeeded.*`,
+                       data:  []byte{},
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + localfile.Name() + "' testfile\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get testfile '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*succeeded.*`,
+                       data:  testdata,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move testfile newdir0/\n",
+                       match: `(?ms).*Moving .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move testfile newdir0/\n",
+                       match: `(?ms).*Moving .* failed.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "ls\n",
+                       match: `(?ms).*newdir0.* 0 +` + matchToday + ` \d+:\d+\n.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move newdir0/testfile emptyfile/bogus/\n",
+                       match: `(?ms).*Moving .* failed.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "mkcol newdir1\n",
+                       match: `(?ms).*Creating .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move newdir0/testfile newdir1/\n",
+                       match: `(?ms).*Moving .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + localfile.Name() + "' newdir1/testfile1\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "mkcol newdir2\n",
+                       match: `(?ms).*Creating .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + localfile.Name() + "' newdir2/testfile2\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "copy newdir2/testfile2 testfile3\n",
+                       match: `(?ms).*succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get testfile3 '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*succeeded.*`,
+                       data:  testdata,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*succeeded.*`,
+                       data:  testdata,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "rmcol newdir2\n",
+                       match: `(?ms).*Deleting collection .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*Downloading .* failed.*`,
+               },
+               {
+                       path:  "/c=" + arvadostest.UserAgreementCollection + "/t=" + arv.AuthToken + "/",
+                       cmd:   "put '" + localfile.Name() + "' foo\n",
+                       match: `(?ms).*Uploading .* failed:.*403 Forbidden.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "put '" + localfile.Name() + "' foo\n",
+                       match: `(?ms).*Uploading .* failed:.*405 Method Not Allowed.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "move foo bar\n",
+                       match: `(?ms).*Moving .* failed:.*405 Method Not Allowed.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "copy foo bar\n",
+                       match: `(?ms).*Copying .* failed:.*405 Method Not Allowed.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "delete foo\n",
+                       match: `(?ms).*Deleting .* failed:.*405 Method Not Allowed.*`,
                },
        } {
-               c.Logf("%s %#v", "http://"+s.testServer.Addr, trial)
+               c.Logf("%s %+v", "http://"+s.testServer.Addr, trial)
+
+               os.Remove(checkfile.Name())
+
                cmd := exec.Command("cadaver", "http://"+s.testServer.Addr+trial.path)
                cmd.Stdin = bytes.NewBufferString(trial.cmd)
                stdout, err := cmd.StdoutPipe()
@@ -56,5 +216,15 @@ func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) {
                err = cmd.Wait()
                c.Check(err, check.Equals, nil)
                c.Check(buf.String(), check.Matches, trial.match)
+
+               if trial.data == nil {
+                       continue
+               }
+               checkfile, err = os.Open(checkfile.Name())
+               c.Assert(err, check.IsNil)
+               checkfile.Seek(0, os.SEEK_SET)
+               got, err := ioutil.ReadAll(checkfile)
+               c.Check(got, check.DeepEquals, trial.data)
+               c.Check(err, check.IsNil)
        }
 }
index 598fabcd37eddb2853119999145a760227e17060..b7da3b0e5ad2df7642319f16a97015bb3e45de63 100644 (file)
@@ -2,11 +2,11 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-// Keep-web provides read-only HTTP access to files stored in Keep. It
-// serves public data to anonymous and unauthenticated clients, and
-// serves private data to clients that supply Arvados API tokens. It
-// can be installed anywhere with access to Keep services, typically
-// behind a web proxy that supports TLS.
+// Keep-web provides read/write HTTP (WebDAV) access to files stored
+// in Keep. It serves public data to anonymous and unauthenticated
+// clients, and serves private data to clients that supply Arvados API
+// tokens. It can be installed anywhere with access to Keep services,
+// typically behind a web proxy that supports TLS.
 //
 // See http://doc.arvados.org/install/install-keep-web.html.
 //
@@ -40,7 +40,7 @@
 //
 // Proxy configuration
 //
-// Keep-web does not support SSL natively. Typically, it is installed
+// Keep-web does not support TLS natively. Typically, it is installed
 // behind a proxy like nginx.
 //
 // Here is an example nginx configuration.
index fd36218bc1c7a3c96e9d8917e8ecc60a1641ba55..4222e3822e14d9d35fbccc84c58b5f8f0ca45182 100644 (file)
@@ -10,6 +10,7 @@ import (
        "html"
        "html/template"
        "io"
+       "log"
        "net/http"
        "net/url"
        "os"
@@ -96,10 +97,64 @@ func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
        json.NewEncoder(w).Encode(status)
 }
 
+// updateOnSuccess wraps httpserver.ResponseWriter. If the handler
+// sends an HTTP header indicating success, updateOnSuccess first
+// calls the provided update func. If the update func fails, a 500
+// response is sent, and the status code and body sent by the handler
+// are ignored (all response writes return the update error).
+type updateOnSuccess struct {
+       httpserver.ResponseWriter
+       update     func() error
+       sentHeader bool
+       err        error
+}
+
+func (uos *updateOnSuccess) Write(p []byte) (int, error) {
+       if uos.err != nil {
+               return 0, uos.err
+       }
+       if !uos.sentHeader {
+               uos.WriteHeader(http.StatusOK)
+       }
+       return uos.ResponseWriter.Write(p)
+}
+
+func (uos *updateOnSuccess) WriteHeader(code int) {
+       if !uos.sentHeader {
+               uos.sentHeader = true
+               if code >= 200 && code < 400 {
+                       if uos.err = uos.update(); uos.err != nil {
+                               code := http.StatusInternalServerError
+                               if err, ok := uos.err.(*arvados.TransactionError); ok {
+                                       code = err.StatusCode
+                               }
+                               log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err)
+                               http.Error(uos.ResponseWriter, uos.err.Error(), code)
+                               return
+                       }
+               }
+       }
+       uos.ResponseWriter.WriteHeader(code)
+}
+
 var (
+       writeMethod = map[string]bool{
+               "COPY":   true,
+               "DELETE": true,
+               "MKCOL":  true,
+               "MOVE":   true,
+               "PUT":    true,
+               "RMCOL":  true,
+       }
        webdavMethod = map[string]bool{
+               "COPY":     true,
+               "DELETE":   true,
+               "MKCOL":    true,
+               "MOVE":     true,
                "OPTIONS":  true,
                "PROPFIND": true,
+               "PUT":      true,
+               "RMCOL":    true,
        }
        browserMethod = map[string]bool{
                "GET":  true,
@@ -147,7 +202,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        return
                }
                w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Range")
-               w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PROPFIND")
+               w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
                w.Header().Set("Access-Control-Allow-Origin", "*")
                w.Header().Set("Access-Control-Max-Age", "86400")
                statusCode = http.StatusOK
@@ -352,21 +407,45 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        applyContentDispositionHdr(w, r, basename, attachment)
 
-       fs := collection.FileSystem(&arvados.Client{
+       client := &arvados.Client{
                APIHost:   arv.ApiServer,
                AuthToken: arv.ApiToken,
                Insecure:  arv.ApiInsecure,
-       }, kc)
+       }
+       fs, err := collection.FileSystem(client, kc)
+       if err != nil {
+               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               return
+       }
+
+       targetIsPDH := arvadosclient.PDHMatch(targetID)
+       if targetIsPDH && writeMethod[r.Method] {
+               statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error()
+               return
+       }
+
        if webdavMethod[r.Method] {
+               if writeMethod[r.Method] {
+                       // Save the collection only if/when all
+                       // webdav->filesystem operations succeed --
+                       // and send a 500 error if the modified
+                       // collection can't be saved.
+                       w = &updateOnSuccess{
+                               ResponseWriter: w,
+                               update: func() error {
+                                       return h.Config.Cache.Update(client, *collection, fs)
+                               }}
+               }
                h := webdav.Handler{
-                       Prefix:     "/" + strings.Join(pathParts[:stripParts], "/"),
-                       FileSystem: &webdavFS{collfs: fs},
+                       Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
+                       FileSystem: &webdavFS{
+                               collfs:  fs,
+                               writing: writeMethod[r.Method],
+                       },
                        LockSystem: h.webdavLS,
                        Logger: func(_ *http.Request, err error) {
-                               if os.IsNotExist(err) {
-                                       statusCode, statusText = http.StatusNotFound, err.Error()
-                               } else if err != nil {
-                                       statusCode, statusText = http.StatusInternalServerError, err.Error()
+                               if err != nil {
+                                       log.Printf("error from webdav handler: %q", err)
                                }
                        },
                }
index 6bd34d71130aaeefd4984e835f8285c01c6899ad..21e47c8dc7c3e0a64f8d320e24b5cd9041fe7117 100644 (file)
@@ -45,12 +45,12 @@ func (s *UnitSuite) TestCORSPreflight(c *check.C) {
        c.Check(resp.Code, check.Equals, http.StatusOK)
        c.Check(resp.Body.String(), check.Equals, "")
        c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, "*")
-       c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "GET, POST, OPTIONS, PROPFIND")
+       c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
        c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Equals, "Authorization, Content-Type, Range")
 
        // Check preflight for a disallowed request
        resp = httptest.NewRecorder()
-       req.Header.Set("Access-Control-Request-Method", "DELETE")
+       req.Header.Set("Access-Control-Request-Method", "MAKE-COFFEE")
        h.ServeHTTP(resp, req)
        c.Check(resp.Body.String(), check.Equals, "")
        c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed)
index 57f3f53a99ef6a73944c6cb600c1672c67cb6696..3ceb0ed5c9ea6c71f06e5ad857ad1223bb277433 100644 (file)
@@ -9,9 +9,9 @@ import (
        "errors"
        "fmt"
        prand "math/rand"
-       "net/http"
        "os"
-       "sync"
+       "path"
+       "strings"
        "sync/atomic"
        "time"
 
@@ -27,103 +27,83 @@ var (
        errReadOnly           = errors.New("read-only filesystem")
 )
 
-// webdavFS implements a read-only webdav.FileSystem by wrapping an
+// webdavFS implements a webdav.FileSystem by wrapping an
 // arvados.CollectionFilesystem.
+//
+// Collections don't preserve empty directories, so Mkdir is
+// effectively a no-op, and we need to make parent dirs spring into
+// existence automatically so sequences like "mkcol foo; put foo/bar"
+// work as expected.
 type webdavFS struct {
-       collfs arvados.CollectionFileSystem
+       collfs  arvados.CollectionFileSystem
+       writing bool
 }
 
-var _ webdav.FileSystem = &webdavFS{}
+func (fs *webdavFS) makeparents(name string) {
+       dir, name := path.Split(name)
+       if dir == "" || dir == "/" {
+               return
+       }
+       dir = dir[:len(dir)-1]
+       fs.makeparents(dir)
+       fs.collfs.Mkdir(dir, 0755)
+}
 
 func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
-       return errReadOnly
+       if !fs.writing {
+               return errReadOnly
+       }
+       name = strings.TrimRight(name, "/")
+       fs.makeparents(name)
+       return fs.collfs.Mkdir(name, 0755)
 }
 
-func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) {
-       fi, err := fs.collfs.Stat(name)
-       if err != nil {
-               return nil, err
+func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
+       writing := flag&(os.O_WRONLY|os.O_RDWR) != 0
+       if writing {
+               fs.makeparents(name)
+       }
+       f, err = fs.collfs.OpenFile(name, flag, perm)
+       if !fs.writing {
+               // webdav module returns 404 on all OpenFile errors,
+               // but returns 405 Method Not Allowed if OpenFile()
+               // succeeds but Write() or Close() fails. We'd rather
+               // have 405.
+               f = writeFailer{File: f, err: errReadOnly}
        }
-       return &webdavFile{collfs: fs.collfs, fileInfo: fi, name: name}, nil
+       return
 }
 
 func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
-       return errReadOnly
+       return fs.collfs.RemoveAll(name)
 }
 
 func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
-       return errReadOnly
-}
-
-func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
-       return fs.collfs.Stat(name)
-}
-
-// webdavFile implements a read-only webdav.File by wrapping
-// http.File.
-//
-// The http.File is opened from an arvados.CollectionFileSystem, but
-// not until Seek, Read, or Readdir is called. This deferred-open
-// strategy makes webdav's OpenFile-Stat-Close cycle fast even though
-// the collfs's Open method is slow. This is relevant because webdav
-// does OpenFile-Stat-Close on each file when preparing directory
-// listings.
-//
-// Writes to a webdavFile always fail.
-type webdavFile struct {
-       // fields populated by (*webdavFS).OpenFile()
-       collfs   http.FileSystem
-       fileInfo os.FileInfo
-       name     string
-
-       // internal fields
-       file     http.File
-       loadOnce sync.Once
-       err      error
-}
-
-func (f *webdavFile) load() {
-       f.file, f.err = f.collfs.Open(f.name)
-}
-
-func (f *webdavFile) Write([]byte) (int, error) {
-       return 0, errReadOnly
-}
-
-func (f *webdavFile) Seek(offset int64, whence int) (int64, error) {
-       f.loadOnce.Do(f.load)
-       if f.err != nil {
-               return 0, f.err
+       if !fs.writing {
+               return errReadOnly
        }
-       return f.file.Seek(offset, whence)
+       fs.makeparents(newName)
+       return fs.collfs.Rename(oldName, newName)
 }
 
-func (f *webdavFile) Read(buf []byte) (int, error) {
-       f.loadOnce.Do(f.load)
-       if f.err != nil {
-               return 0, f.err
+func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
+       if fs.writing {
+               fs.makeparents(name)
        }
-       return f.file.Read(buf)
+       return fs.collfs.Stat(name)
 }
 
-func (f *webdavFile) Close() error {
-       if f.file == nil {
-               // We never called load(), or load() failed
-               return f.err
-       }
-       return f.file.Close()
+type writeFailer struct {
+       webdav.File
+       err error
 }
 
-func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) {
-       f.loadOnce.Do(f.load)
-       if f.err != nil {
-               return nil, f.err
-       }
-       return f.file.Readdir(n)
+func (wf writeFailer) Write([]byte) (int, error) {
+       return 0, wf.err
 }
 
-func (f *webdavFile) Stat() (os.FileInfo, error) {
-       return f.fileInfo, nil
+func (wf writeFailer) Close() error {
+       return wf.err
 }
 
 // noLockSystem implements webdav.LockSystem by returning success for
diff --git a/services/keep-web/webdav_test.go b/services/keep-web/webdav_test.go
new file mode 100644 (file)
index 0000000..52db776
--- /dev/null
@@ -0,0 +1,5 @@
+package main
+
+import "golang.org/x/net/webdav"
+
+var _ webdav.FileSystem = &webdavFS{}
index 3d1b4476255d0cadf2274ad12b9aae78f164f72f..e2a6221f10e28981ea0c3f64fa5ce6d52ac718ea 100644 (file)
@@ -10,7 +10,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net"
        "net/http"
        "os"
@@ -25,7 +24,9 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/health"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
@@ -55,7 +56,13 @@ var (
        router   http.Handler
 )
 
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
 func main() {
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: rfc3339NanoFixed,
+       })
+
        cfg := DefaultConfig()
 
        flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
@@ -164,7 +171,7 @@ func main() {
 
        // Start serving requests.
        router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
-       http.Serve(listener, router)
+       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
 
        log.Println("shutting down")
 }
@@ -596,7 +603,8 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient
                        Timeout:   h.timeout,
                        Transport: h.transport,
                },
-               proto: req.Proto,
+               proto:     req.Proto,
+               requestID: req.Header.Get("X-Request-Id"),
        }
        return &kc
 }
index 23bb2bd216c1f1080d246e63a47252564e4d9bf4..a7b608b69c462fd4149f932c16dbabcaddbca6c6 100644 (file)
@@ -10,7 +10,6 @@ import (
        "errors"
        "fmt"
        "io/ioutil"
-       "log"
        "math/rand"
        "net/http"
        "net/http/httptest"
@@ -57,7 +56,7 @@ func waitForListener() {
                time.Sleep(ms * time.Millisecond)
        }
        if listener == nil {
-               log.Fatalf("Timed out waiting for listener to start")
+               panic("Timed out waiting for listener to start")
        }
 }
 
@@ -255,14 +254,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        {
                _, _, err := kc.Ask(hash)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Ask (expected BlockNotFound)")
+               c.Log("Finished Ask (expected BlockNotFound)")
        }
 
        {
                reader, _, _, err := kc.Get(hash)
                c.Check(reader, Equals, nil)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Get (expected BlockNotFound)")
+               c.Log("Finished Get (expected BlockNotFound)")
        }
 
        // Note in bug #5309 among other errors keepproxy would set
@@ -281,14 +280,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB (expected success)")
+               c.Log("Finished PutB (expected success)")
        }
 
        {
                blocklen, _, err := kc.Ask(hash2)
                c.Assert(err, Equals, nil)
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Ask (expected success)")
+               c.Log("Finished Ask (expected success)")
        }
 
        {
@@ -297,7 +296,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte("foo"))
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Get (expected success)")
+               c.Log("Finished Get (expected success)")
        }
 
        {
@@ -307,7 +306,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB zero block")
+               c.Log("Finished PutB zero block")
        }
 
        {
@@ -316,7 +315,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte(""))
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Finished Get zero block")
+               c.Log("Finished Get zero block")
        }
 }
 
@@ -331,7 +330,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -339,7 +338,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(hash2, Equals, "")
                c.Check(rep, Equals, 0)
                c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -348,7 +347,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -357,7 +356,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
@@ -372,7 +371,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -380,7 +379,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -389,7 +388,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -398,7 +397,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
index 0faf4aea0e3c35354e30dc33f1e7005d491ab4d5..3fa2671df58331bb52c16651ded3924e9390ee90 100644 (file)
@@ -13,11 +13,13 @@ import (
 var viaAlias = "keepproxy"
 
 type proxyClient struct {
-       client keepclient.HTTPClient
-       proto  string
+       client    keepclient.HTTPClient
+       proto     string
+       requestID string
 }
 
 func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
        req.Header.Add("Via", pc.proto+" "+viaAlias)
+       req.Header.Add("X-Request-Id", pc.requestID)
        return pc.client.Do(req)
 }
index 424910dfa8b2af4a0d22ad9e91bd4eab20076af7..4d042a70dd376ea1e04bdff16283ab80669dfd0f 100644 (file)
@@ -975,7 +975,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+               MakeRESTRouter().ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 921176dbbe93f481f497af476f176c2116ef3bce..e422179f643e9cad438742cd2aa450d52ae84fa4 100644 (file)
@@ -147,11 +147,11 @@ func main() {
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, MaxRequests limiter, method handlers
+       // Middleware/handler stack
        router := MakeRESTRouter()
        limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
        router.limiter = limiter
-       http.Handle("/", &LoggingRESTRouter{router: limiter})
+       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
deleted file mode 100644 (file)
index 63c28a2..0000000
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
-       "context"
-       "net/http"
-       "strings"
-       "time"
-
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/stats"
-       log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
-       Status int
-       Length int
-       http.ResponseWriter
-       ResponseBody string
-       sentHdr      time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
-       wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
-       if !ok {
-               // If upstream doesn't implement CloseNotifier, we can
-               // satisfy the interface by returning a channel that
-               // never sends anything (the interface doesn't
-               // guarantee that anything will ever be sent on the
-               // channel even if the client disconnects).
-               return nil
-       }
-       return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
-       if resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Status = code
-       resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
-       if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Length += len(data)
-       if resp.Status >= 400 {
-               resp.ResponseBody += string(data)
-       }
-       return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
-       router      http.Handler
-       idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
-       tStart := time.Now()
-
-       // Attach a requestID-aware logger to the request context.
-       lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
-       ctx := context.WithValue(req.Context(), "logger", lgr)
-       req = req.WithContext(ctx)
-
-       lgr = lgr.WithFields(log.Fields{
-               "remoteAddr":      req.RemoteAddr,
-               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
-               "reqMethod":       req.Method,
-               "reqPath":         req.URL.Path[1:],
-               "reqBytes":        req.ContentLength,
-       })
-       lgr.Debug("request")
-
-       resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
-       loggingRouter.router.ServeHTTP(&resp, req)
-       tDone := time.Now()
-
-       statusText := http.StatusText(resp.Status)
-       if resp.Status >= 400 {
-               statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
-       }
-       if resp.sentHdr == zeroTime {
-               // Nobody changed status or wrote any data, i.e., we
-               // returned a 200 response with no body.
-               resp.sentHdr = tDone
-       }
-
-       lgr.WithFields(log.Fields{
-               "timeTotal":      stats.Duration(tDone.Sub(tStart)),
-               "timeToStatus":   stats.Duration(resp.sentHdr.Sub(tStart)),
-               "timeWriteBody":  stats.Duration(tDone.Sub(resp.sentHdr)),
-               "respStatusCode": resp.Status,
-               "respStatus":     statusText,
-               "respBytes":      resp.Length,
-       }).Info("response")
-}
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
deleted file mode 100644 (file)
index 6ca48dc..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "net/http"
-       "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
-       http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
index e6a53d06c6c297ab343f697dd7c5f68a40e8355e..61e69f9096503eb88cf9328af5e85f129dba4226 100644 (file)
@@ -45,6 +45,8 @@ var (
        s3RaceWindow    time.Duration
 
        s3ACL = s3.Private
+
+       zeroTime time.Time
 )
 
 const (