11453: Merge branch 'master' into 11453-federated-tokens
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 4 Dec 2017 21:03:38 +0000 (16:03 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 4 Dec 2017 21:03:38 +0000 (16:03 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

67 files changed:
apps/workbench/app/models/arvados_api_client.rb
apps/workbench/app/models/user.rb
build/run-library.sh
build/run-tests.sh
doc/_config.yml
doc/install/install-keep-web.html.textile.liquid
doc/user/cwl/cwl-run-options.html.textile.liquid
doc/user/topics/arv-sync-groups.html.textile.liquid [new file with mode: 0644]
sdk/go/arvados/client.go
sdk/go/arvados/collection.go
sdk/go/arvados/collection_fs.go
sdk/go/arvados/collection_fs_test.go
sdk/go/arvados/error.go
sdk/go/httpserver/id_generator.go
sdk/go/httpserver/logger.go [new file with mode: 0644]
sdk/go/httpserver/logger_test.go [new file with mode: 0644]
sdk/go/httpserver/responsewriter.go
sdk/go/keepclient/block_cache.go
sdk/go/keepclient/collectionreader.go
sdk/go/keepclient/collectionreader_test.go
sdk/go/keepclient/keepclient.go
services/api/app/models/container.rb
services/api/test/unit/container_request_test.rb
services/arv-git-httpd/auth_handler.go
services/arv-git-httpd/main.go
services/crunch-dispatch-local/crunch-dispatch-local.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunchstat/crunchstat.go
services/fuse/arvados_fuse/command.py
services/health/main.go
services/keep-balance/main.go
services/keep-web/cache.go
services/keep-web/cadaver_test.go
services/keep-web/doc.go
services/keep-web/handler.go
services/keep-web/handler_test.go
services/keep-web/main.go
services/keep-web/status_test.go
services/keep-web/webdav.go
services/keep-web/webdav_test.go [new file with mode: 0644]
services/keepproxy/keepproxy.go
services/keepproxy/keepproxy_test.go
services/keepproxy/proxy_client.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/keepstore.go
services/keepstore/logging_router.go [deleted file]
services/keepstore/logging_router_test.go [deleted file]
services/keepstore/pull_worker_test.go
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/launcher.py
services/nodemanager/arvnodeman/status.py
services/nodemanager/tests/test_status.py
services/ws/main.go
services/ws/router.go
services/ws/server_test.go
tools/arv-sync-groups/arv-sync-groups.go
tools/crunchstat-summary/MANIFEST.in
tools/crunchstat-summary/crunchstat_summary/dygraphs.js
tools/crunchstat-summary/crunchstat_summary/dygraphs.py
tools/crunchstat-summary/crunchstat_summary/synchronizer.js [new file with mode: 0644]
tools/crunchstat-summary/crunchstat_summary/webchart.py
tools/keep-block-check/keep-block-check.go
tools/keep-exercise/keep-exercise.go
tools/keep-rsync/keep-rsync.go

index ac2fe3a97629f8ec957b6a99b21f779b9d363e63..5a8fd518d386ec89125552c9fe17730e0488d4c4 100644 (file)
@@ -82,7 +82,7 @@ class ArvadosApiClient
     @client_mtx = Mutex.new
   end
 
-  def api(resources_kind, action, data=nil, tokens={})
+  def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true)
 
     profile_checkpoint
 
@@ -117,7 +117,7 @@ class ArvadosApiClient
       'reader_tokens' => ((tokens[:reader_tokens] ||
                            Thread.current[:reader_tokens] ||
                            []) +
-                          [Rails.configuration.anonymous_user_token]).to_json,
+                          (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json,
     }
     if !data.nil?
       data.each do |k,v|
index 10da22db69e4540f8d7027df94b3fd7d43d98279..1f102dbf17acd3fb807110c34f4937686ebb9f2d 100644 (file)
@@ -10,7 +10,7 @@ class User < ArvadosBase
   end
 
   def self.current
-    res = arvados_api_client.api self, '/current'
+    res = arvados_api_client.api self, '/current', nil, {}, false
     arvados_api_client.unpack_api_response(res)
   end
 
index 5fc494cdf5aad3608cd1f7b7eafb2c5bd19035d0..029fefc9bb3880fb9643e15ee108e6692c0211f0 100755 (executable)
@@ -132,7 +132,7 @@ package_go_binary() {
       return 1
     fi
 
-    go get "git.curoverse.com/arvados.git/$src_path"
+    go get -ldflags "-X main.version=${version}" "git.curoverse.com/arvados.git/$src_path"
 
     declare -a switches=()
     systemd_unit="$WORKSPACE/${src_path}/${prog}.service"
index c6110f28b7ffcdf343dee430c64b5a6e9ef9af00..6063779b306411038555604a209b741f78f1fbb6 100755 (executable)
@@ -487,6 +487,7 @@ export PERLLIB="$PERLINSTALLBASE/lib/perl5:${PERLLIB:+$PERLLIB}"
 
 export GOPATH
 mkdir -p "$GOPATH/src/git.curoverse.com"
+rmdir --parents "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH"
 ln -sfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
     || fatal "symlink failed"
 go get -v github.com/kardianos/govendor \
index 3068647074ca2a82166860f47cd27bfbb0c079d7..e8a899c004c1c257ee05d36d757ac42644eca10a 100644 (file)
@@ -74,6 +74,7 @@ navbar:
       - user/topics/run-command.html.textile.liquid
       - user/reference/job-pipeline-ref.html.textile.liquid
       - user/examples/crunch-examples.html.textile.liquid
+      - user/topics/arv-sync-groups.html.textile.liquid
     - Query the metadata database:
       - user/topics/tutorial-trait-search.html.textile.liquid
     - Arvados License:
index ea2ebd161b9257291c44d5edd4a10fa955631671..4def77e063c879c53b9e0ea2529483360157636e 100644 (file)
@@ -9,7 +9,7 @@ Copyright (C) The Arvados Authors. All rights reserved.
 SPDX-License-Identifier: CC-BY-SA-3.0
 {% endcomment %}
 
-The Keep-web server provides read-only HTTP access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
+The Keep-web server provides read/write HTTP (WebDAV) access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides TLS support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
 
 By convention, we use the following hostnames for the Keep-web service:
 
@@ -48,7 +48,7 @@ Usage of keep-web:
   -allow-anonymous
         Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection. (default false)
   -attachment-only-host string
-        Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or SSL.
+        Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or TLS.
   -listen string
         Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
   -trust-all-content
@@ -77,15 +77,15 @@ exec sudo -u nobody keep-web \
 
 Omit the @-allow-anonymous@ argument if you do not want to serve public data.
 
-Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's SSL certificate is not signed by a recognized CA.
+Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's TLS certificate is not signed by a recognized CA.
 
-h3. Set up a reverse proxy with SSL support
+h3. Set up a reverse proxy with TLS support
 
-The Keep-web service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+The Keep-web service will be accessible from anywhere on the internet, so we recommend using TLS for transport encryption.
 
-This is best achieved by putting a reverse proxy with SSL support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
+This is best achieved by putting a reverse proxy with TLS support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
 
-Note: A wildcard SSL certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard SSL certificate and DNS configured appropriately, all data can be served as web content.
+Note: A wildcard TLS certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard TLS certificate and DNS configured appropriately, all data can be served as web content.
 
 For example, using Nginx:
 
@@ -125,8 +125,8 @@ h3. Configure DNS
 Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
 * @download.uuid_prefix.your.domain@
 * @collections.uuid_prefix.your.domain@
-* @*--collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
-* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names.
+* @*--collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
+* @*.collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for these names.
 
 If neither of the above wildcard options is feasible, you have two choices:
 # Serve web content at @collections.uuid_prefix.your.domain@, but only for unauthenticated requests (public data and collection sharing links). Authenticated requests will always result in file downloads, using the @download@ name. For example, the Workbench "preview" button and the "view entire log file" link will invoke file downloads instead of displaying content in the browser window.
index 7598ab822e0c5852bc7409ddba21d189f19dd1dc..7f69c61feb0e445dd162a07f4d130a21f64ebfd2 100644 (file)
@@ -38,7 +38,7 @@ table(table table-bordered table-condensed).
 |==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner job (default 1024)|
 |==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner job, default arvados/jobs|
 |==--name== NAME|           Name to use for workflow execution instance.|
-|==--on-error {stop,continue}|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
+|==--on-error {stop,continue}==|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
 |==--enable-dev==|          Enable loading and running development versions of CWL spec.|
 |==--intermediate-output-ttl== N|If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).|
 |==--trash-intermediate==|  Immediately trash intermediate outputs on workflow success.|
diff --git a/doc/user/topics/arv-sync-groups.html.textile.liquid b/doc/user/topics/arv-sync-groups.html.textile.liquid
new file mode 100644 (file)
index 0000000..e2a42c8
--- /dev/null
@@ -0,0 +1,53 @@
+---
+layout: default
+navsection: userguide
+title: "Using arv-sync-groups"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+The @arv-sync-groups@ tool allows to synchronize remote groups into Arvados from an external source.
+
+h1. Using arv-sync-groups
+
+This tool reads a CSV (comma-separated values) file having information about external groups and their members. When running it for the first time, it'll create a special group named 'Externally synchronized groups' meant to be the parent of all the remote groups.
+
+Every line on the file should have 2 values: a group name and a local user identifier, meaning that the named user is a member of the group. The tool will create the group if it doesn't exist, and add the user to it. If group member is not present on the input file, the account will be removed from the group.
+
+Users can be identified by their email address or username: the tool will check if every user exist on the system, and report back when not found. Groups on the other hand, are identified by their name.
+
+This tool is designed to be run periodically reading a file created by a remote auth system (ie: LDAP) dump script, applying what's included on the file as the source of truth.
+
+
+bq. NOTE: @arv-sync-groups@ needs to perform several administrative tasks on Arvados, so must be run using a superuser token
+
+h2. Options
+
+The following command line options are supported:
+
+table(table table-bordered table-condensed).
+|_. Option |_. Description |
+|==--help==|             This list of options|
+|==--parent-group-uuid==|   UUID of group to own all the externally synchronized groups|
+|==--user-id== |            Identifier to use in looking up user. One of 'email' or 'username' (Default: 'email')|
+|==--verbose==|             Log informational messages (Default: False)|
+|==--version==|             Print version and exit|
+
+h2. Examples
+
+To sync groups using the username to identify every account, reading from some @external_groups.csv@ file, the command should be called as follows:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
+
+If you want to use a specific preexisting group as the parent of all the remote groups, you can do it this way:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --parent-group-uuid &lt;preexisting group UUID&gt; --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
index 47a953ac2c02ab4422eeec34e5c18c27fbf61947..a38d95c2e68ee90e1d9f0d41bdef2b341127fd27 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "bytes"
        "crypto/tls"
        "encoding/json"
        "fmt"
@@ -180,6 +181,10 @@ func anythingToValues(params interface{}) (url.Values, error) {
 //
 // path must not contain a query string.
 func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+       if body, ok := body.(io.Closer); ok {
+               // Ensure body is closed even if we error out early
+               defer body.Close()
+       }
        urlString := c.apiURL(path)
        urlValues, err := anythingToValues(params)
        if err != nil {
@@ -202,6 +207,24 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.
        return c.DoAndDecode(dst, req)
 }
 
+type resource interface {
+       resourceName() string
+}
+
+// UpdateBody returns an io.Reader suitable for use as an http.Request
+// Body for a create or update API call.
+func (c *Client) UpdateBody(rsc resource) io.Reader {
+       j, err := json.Marshal(rsc)
+       if err != nil {
+               // Return a reader that returns errors.
+               r, w := io.Pipe()
+               w.CloseWithError(err)
+               return r
+       }
+       v := url.Values{rsc.resourceName(): {string(j)}}
+       return bytes.NewBufferString(v.Encode())
+}
+
 func (c *Client) httpClient() *http.Client {
        switch {
        case c.Client != nil:
index 61bcd7fe8f367f2ca3d3d48017213bbd5335d33e..999b4e9d483454ace177cad829e90f85ddccc44c 100644 (file)
@@ -30,6 +30,10 @@ type Collection struct {
        IsTrashed              bool       `json:"is_trashed,omitempty"`
 }
 
+func (c Collection) resourceName() string {
+       return "collection"
+}
+
 // SizedDigests returns the hash+size part of each data block
 // referenced by the collection.
 func (c *Collection) SizedDigests() ([]SizedDigest, error) {
index 1acf27442c3dc514abe1c3b2538549cba9bbcd9f..28629e33b20f31189ac460d02ab2868fdd85db96 100644 (file)
 package arvados
 
 import (
+       "errors"
+       "fmt"
        "io"
        "net/http"
        "os"
        "path"
+       "regexp"
+       "sort"
+       "strconv"
        "strings"
        "sync"
        "time"
+)
+
+var (
+       ErrReadOnlyFile      = errors.New("read-only file")
+       ErrNegativeOffset    = errors.New("cannot seek to negative offset")
+       ErrFileExists        = errors.New("file exists")
+       ErrInvalidOperation  = errors.New("invalid operation")
+       ErrInvalidArgument   = errors.New("invalid argument")
+       ErrDirectoryNotEmpty = errors.New("directory not empty")
+       ErrWriteOnlyMode     = errors.New("file is O_WRONLY")
+       ErrSyncNotSupported  = errors.New("O_SYNC flag is not supported")
+       ErrIsDirectory       = errors.New("cannot rename file to overwrite existing directory")
+       ErrPermission        = os.ErrPermission
 
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
+       maxBlockSize = 1 << 26
 )
 
+// A File is an *os.File-like interface for reading and writing files
+// in a CollectionFileSystem.
 type File interface {
        io.Reader
+       io.Writer
        io.Closer
        io.Seeker
        Size() int64
+       Readdir(int) ([]os.FileInfo, error)
+       Stat() (os.FileInfo, error)
+       Truncate(int64) error
 }
 
 type keepClient interface {
-       ManifestFileReader(manifest.Manifest, string) (File, error)
+       ReadAt(locator string, p []byte, off int) (int, error)
+       PutB(p []byte) (string, int, error)
 }
 
-type collectionFile struct {
-       File
-       collection *Collection
-       name       string
-       size       int64
+type fileinfo struct {
+       name    string
+       mode    os.FileMode
+       size    int64
+       modTime time.Time
 }
 
-func (cf *collectionFile) Size() int64 {
-       return cf.size
+// Name implements os.FileInfo.
+func (fi fileinfo) Name() string {
+       return fi.name
 }
 
-func (cf *collectionFile) Readdir(count int) ([]os.FileInfo, error) {
-       return nil, io.EOF
+// ModTime implements os.FileInfo.
+func (fi fileinfo) ModTime() time.Time {
+       return fi.modTime
 }
 
-func (cf *collectionFile) Stat() (os.FileInfo, error) {
-       return collectionDirent{
-               collection: cf.collection,
-               name:       cf.name,
-               size:       cf.size,
-               isDir:      false,
-       }, nil
+// Mode implements os.FileInfo.
+func (fi fileinfo) Mode() os.FileMode {
+       return fi.mode
 }
 
-type collectionDir struct {
-       collection *Collection
-       stream     string
-       dirents    []os.FileInfo
+// IsDir implements os.FileInfo.
+func (fi fileinfo) IsDir() bool {
+       return fi.mode&os.ModeDir != 0
 }
 
-// Readdir implements os.File.
-func (cd *collectionDir) Readdir(count int) ([]os.FileInfo, error) {
-       ret := cd.dirents
-       if count <= 0 {
-               cd.dirents = nil
-               return ret, nil
-       } else if len(ret) == 0 {
-               return nil, io.EOF
+// Size implements os.FileInfo.
+func (fi fileinfo) Size() int64 {
+       return fi.size
+}
+
+// Sys implements os.FileInfo.
+func (fi fileinfo) Sys() interface{} {
+       return nil
+}
+
+// A CollectionFileSystem is an http.Filesystem plus Stat() and
+// support for opening writable files. All methods are safe to call
+// from multiple goroutines.
+type CollectionFileSystem interface {
+       http.FileSystem
+
+       // analogous to os.Stat()
+       Stat(name string) (os.FileInfo, error)
+
+       // analogous to os.Create(): create/truncate a file and open it O_RDWR.
+       Create(name string) (File, error)
+
+       // Like os.OpenFile(): create or open a file or directory.
+       //
+       // If flag&os.O_EXCL==0, it opens an existing file or
+       // directory if one exists. If flag&os.O_CREATE!=0, it creates
+       // a new empty file or directory if one does not already
+       // exist.
+       //
+       // When creating a new item, perm&os.ModeDir determines
+       // whether it is a file or a directory.
+       //
+       // A file can be opened multiple times and used concurrently
+       // from multiple goroutines. However, each File object should
+       // be used by only one goroutine at a time.
+       OpenFile(name string, flag int, perm os.FileMode) (File, error)
+
+       Mkdir(name string, perm os.FileMode) error
+       Remove(name string) error
+       RemoveAll(name string) error
+       Rename(oldname, newname string) error
+
+       // Flush all file data to Keep and return a snapshot of the
+       // filesystem suitable for saving as (Collection)ManifestText.
+       // Prefix (normally ".") is a top level directory, effectively
+       // prepended to all paths in the returned manifest.
+       MarshalManifest(prefix string) (string, error)
+}
+
+type fileSystem struct {
+       dirnode
+}
+
+func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
+       return fs.dirnode.OpenFile(name, flag, perm)
+}
+
+func (fs *fileSystem) Open(name string) (http.File, error) {
+       return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
+}
+
+func (fs *fileSystem) Create(name string) (File, error) {
+       return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
+}
+
+func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
+       node := fs.dirnode.lookupPath(name)
+       if node == nil {
+               err = os.ErrNotExist
+       } else {
+               fi = node.Stat()
        }
-       var err error
-       if count >= len(ret) {
-               count = len(ret)
-               err = io.EOF
+       return
+}
+
+type inode interface {
+       Parent() inode
+       Read([]byte, filenodePtr) (int, filenodePtr, error)
+       Write([]byte, filenodePtr) (int, filenodePtr, error)
+       Truncate(int64) error
+       Readdir() []os.FileInfo
+       Size() int64
+       Stat() os.FileInfo
+       sync.Locker
+       RLock()
+       RUnlock()
+}
+
+// filenode implements inode.
+type filenode struct {
+       fileinfo fileinfo
+       parent   *dirnode
+       segments []segment
+       // number of times `segments` has changed in a
+       // way that might invalidate a filenodePtr
+       repacked int64
+       memsize  int64 // bytes in memSegments
+       sync.RWMutex
+}
+
+// filenodePtr is an offset into a file that is (usually) efficient to
+// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
+// then
+// filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
+// corresponds to file offset filenodePtr.off. Otherwise, it is
+// necessary to reexamine len(filenode.segments[0]) etc. to find the
+// correct segment and offset.
+type filenodePtr struct {
+       off        int64
+       segmentIdx int
+       segmentOff int
+       repacked   int64
+}
+
+// seek returns a ptr that is consistent with both startPtr.off and
+// the current state of fn. The caller must already hold fn.RLock() or
+// fn.Lock().
+//
+// If startPtr is beyond EOF, ptr.segment* will indicate precisely
+// EOF.
+//
+// After seeking:
+//
+//     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+//     ||
+//     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
+       ptr = startPtr
+       if ptr.off < 0 {
+               // meaningless anyway
+               return
+       } else if ptr.off >= fn.fileinfo.size {
+               ptr.segmentIdx = len(fn.segments)
+               ptr.segmentOff = 0
+               ptr.repacked = fn.repacked
+               return
+       } else if ptr.repacked == fn.repacked {
+               // segmentIdx and segmentOff accurately reflect
+               // ptr.off, but might have fallen off the end of a
+               // segment
+               if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
+                       ptr.segmentIdx++
+                       ptr.segmentOff = 0
+               }
+               return
        }
-       cd.dirents = cd.dirents[count:]
-       return ret[:count], err
+       defer func() {
+               ptr.repacked = fn.repacked
+       }()
+       if ptr.off >= fn.fileinfo.size {
+               ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
+               return
+       }
+       // Recompute segmentIdx and segmentOff.  We have already
+       // established fn.fileinfo.size > ptr.off >= 0, so we don't
+       // have to deal with edge cases here.
+       var off int64
+       for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
+               // This would panic (index out of range) if
+               // fn.fileinfo.size were larger than
+               // sum(fn.segments[i].Len()) -- but that can't happen
+               // because we have ensured fn.fileinfo.size is always
+               // accurate.
+               segLen := int64(fn.segments[ptr.segmentIdx].Len())
+               if off+segLen > ptr.off {
+                       ptr.segmentOff = int(ptr.off - off)
+                       break
+               }
+               off += segLen
+       }
+       return
 }
 
-// Stat implements os.File.
-func (cd *collectionDir) Stat() (os.FileInfo, error) {
-       return collectionDirent{
-               collection: cd.collection,
-               name:       path.Base(cd.stream),
-               isDir:      true,
-               size:       int64(len(cd.dirents)),
-       }, nil
+// caller must have lock
+func (fn *filenode) appendSegment(e segment) {
+       fn.segments = append(fn.segments, e)
+       fn.fileinfo.size += int64(e.Len())
+}
+
+func (fn *filenode) Parent() inode {
+       fn.RLock()
+       defer fn.RUnlock()
+       return fn.parent
 }
 
-// Close implements os.File.
-func (cd *collectionDir) Close() error {
+func (fn *filenode) Readdir() []os.FileInfo {
        return nil
 }
 
-// Read implements os.File.
-func (cd *collectionDir) Read([]byte) (int, error) {
-       return 0, nil
+// Read reads file data from a single segment, starting at startPtr,
+// into p. startPtr is assumed not to be up-to-date. Caller must have
+// RLock or Lock.
+func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+       ptr = fn.seek(startPtr)
+       if ptr.off < 0 {
+               err = ErrNegativeOffset
+               return
+       }
+       if ptr.segmentIdx >= len(fn.segments) {
+               err = io.EOF
+               return
+       }
+       n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
+       if n > 0 {
+               ptr.off += int64(n)
+               ptr.segmentOff += n
+               if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
+                       ptr.segmentIdx++
+                       ptr.segmentOff = 0
+                       if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
+                               err = nil
+                       }
+               }
+       }
+       return
 }
 
-// Seek implements os.File.
-func (cd *collectionDir) Seek(int64, int) (int64, error) {
-       return 0, nil
+func (fn *filenode) Size() int64 {
+       fn.RLock()
+       defer fn.RUnlock()
+       return fn.fileinfo.Size()
 }
 
-// collectionDirent implements os.FileInfo.
-type collectionDirent struct {
-       collection *Collection
-       name       string
-       isDir      bool
-       mode       os.FileMode
-       size       int64
+func (fn *filenode) Stat() os.FileInfo {
+       fn.RLock()
+       defer fn.RUnlock()
+       return fn.fileinfo
 }
 
-// Name implements os.FileInfo.
-func (e collectionDirent) Name() string {
-       return e.name
+func (fn *filenode) Truncate(size int64) error {
+       fn.Lock()
+       defer fn.Unlock()
+       return fn.truncate(size)
 }
 
-// ModTime implements os.FileInfo.
-func (e collectionDirent) ModTime() time.Time {
-       if e.collection.ModifiedAt == nil {
-               return time.Now()
+func (fn *filenode) truncate(size int64) error {
+       if size == fn.fileinfo.size {
+               return nil
+       }
+       fn.repacked++
+       if size < fn.fileinfo.size {
+               ptr := fn.seek(filenodePtr{off: size})
+               for i := ptr.segmentIdx; i < len(fn.segments); i++ {
+                       if seg, ok := fn.segments[i].(*memSegment); ok {
+                               fn.memsize -= int64(seg.Len())
+                       }
+               }
+               if ptr.segmentOff == 0 {
+                       fn.segments = fn.segments[:ptr.segmentIdx]
+               } else {
+                       fn.segments = fn.segments[:ptr.segmentIdx+1]
+                       switch seg := fn.segments[ptr.segmentIdx].(type) {
+                       case *memSegment:
+                               seg.Truncate(ptr.segmentOff)
+                               fn.memsize += int64(seg.Len())
+                       default:
+                               fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
+                       }
+               }
+               fn.fileinfo.size = size
+               return nil
        }
-       return *e.collection.ModifiedAt
+       for size > fn.fileinfo.size {
+               grow := size - fn.fileinfo.size
+               var seg *memSegment
+               var ok bool
+               if len(fn.segments) == 0 {
+                       seg = &memSegment{}
+                       fn.segments = append(fn.segments, seg)
+               } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
+                       seg = &memSegment{}
+                       fn.segments = append(fn.segments, seg)
+               }
+               if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
+                       grow = maxgrow
+               }
+               seg.Truncate(seg.Len() + int(grow))
+               fn.fileinfo.size += grow
+               fn.memsize += grow
+       }
+       return nil
 }
 
-// Mode implements os.FileInfo.
-func (e collectionDirent) Mode() os.FileMode {
-       if e.isDir {
-               return 0555
+// Write writes data from p to the file, starting at startPtr,
+// extending the file size if necessary. Caller must have Lock.
+func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+       if startPtr.off > fn.fileinfo.size {
+               if err = fn.truncate(startPtr.off); err != nil {
+                       return 0, startPtr, err
+               }
+       }
+       ptr = fn.seek(startPtr)
+       if ptr.off < 0 {
+               err = ErrNegativeOffset
+               return
+       }
+       for len(p) > 0 && err == nil {
+               cando := p
+               if len(cando) > maxBlockSize {
+                       cando = cando[:maxBlockSize]
+               }
+               // Rearrange/grow fn.segments (and shrink cando if
+               // needed) such that cando can be copied to
+               // fn.segments[ptr.segmentIdx] at offset
+               // ptr.segmentOff.
+               cur := ptr.segmentIdx
+               prev := ptr.segmentIdx - 1
+               var curWritable bool
+               if cur < len(fn.segments) {
+                       _, curWritable = fn.segments[cur].(*memSegment)
+               }
+               var prevAppendable bool
+               if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
+                       _, prevAppendable = fn.segments[prev].(*memSegment)
+               }
+               if ptr.segmentOff > 0 && !curWritable {
+                       // Split a non-writable block.
+                       if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
+                               // Truncate cur, and insert a new
+                               // segment after it.
+                               cando = cando[:max]
+                               fn.segments = append(fn.segments, nil)
+                               copy(fn.segments[cur+1:], fn.segments[cur:])
+                       } else {
+                               // Split cur into two copies, truncate
+                               // the one on the left, shift the one
+                               // on the right, and insert a new
+                               // segment between them.
+                               fn.segments = append(fn.segments, nil, nil)
+                               copy(fn.segments[cur+2:], fn.segments[cur:])
+                               fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
+                       }
+                       cur++
+                       prev++
+                       seg := &memSegment{}
+                       seg.Truncate(len(cando))
+                       fn.memsize += int64(len(cando))
+                       fn.segments[cur] = seg
+                       fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
+                       ptr.segmentIdx++
+                       ptr.segmentOff = 0
+                       fn.repacked++
+                       ptr.repacked++
+               } else if curWritable {
+                       if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
+                               cando = cando[:fit]
+                       }
+               } else {
+                       if prevAppendable {
+                               // Shrink cando if needed to fit in
+                               // prev segment.
+                               if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
+                                       cando = cando[:cangrow]
+                               }
+                       }
+
+                       if cur == len(fn.segments) {
+                               // ptr is at EOF, filesize is changing.
+                               fn.fileinfo.size += int64(len(cando))
+                       } else if el := fn.segments[cur].Len(); el <= len(cando) {
+                               // cando is long enough that we won't
+                               // need cur any more. shrink cando to
+                               // be exactly as long as cur
+                               // (otherwise we'd accidentally shift
+                               // the effective position of all
+                               // segments after cur).
+                               cando = cando[:el]
+                               copy(fn.segments[cur:], fn.segments[cur+1:])
+                               fn.segments = fn.segments[:len(fn.segments)-1]
+                       } else {
+                               // shrink cur by the same #bytes we're growing prev
+                               fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
+                       }
+
+                       if prevAppendable {
+                               // Grow prev.
+                               ptr.segmentIdx--
+                               ptr.segmentOff = fn.segments[prev].Len()
+                               fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
+                               fn.memsize += int64(len(cando))
+                               ptr.repacked++
+                               fn.repacked++
+                       } else {
+                               // Insert a segment between prev and
+                               // cur, and advance prev/cur.
+                               fn.segments = append(fn.segments, nil)
+                               if cur < len(fn.segments) {
+                                       copy(fn.segments[cur+1:], fn.segments[cur:])
+                                       ptr.repacked++
+                                       fn.repacked++
+                               } else {
+                                       // appending a new segment does
+                                       // not invalidate any ptrs
+                               }
+                               seg := &memSegment{}
+                               seg.Truncate(len(cando))
+                               fn.memsize += int64(len(cando))
+                               fn.segments[cur] = seg
+                               cur++
+                               prev++
+                       }
+               }
+
+               // Finally we can copy bytes from cando to the current segment.
+               fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
+               n += len(cando)
+               p = p[len(cando):]
+
+               ptr.off += int64(len(cando))
+               ptr.segmentOff += len(cando)
+               if ptr.segmentOff >= maxBlockSize {
+                       fn.pruneMemSegments()
+               }
+               if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
+                       ptr.segmentOff = 0
+                       ptr.segmentIdx++
+               }
+
+               fn.fileinfo.modTime = time.Now()
+       }
+       return
+}
+
+// Write some data out to disk to reduce memory use. Caller must have
+// write lock.
+func (fn *filenode) pruneMemSegments() {
+       // TODO: async (don't hold Lock() while waiting for Keep)
+       // TODO: share code with (*dirnode)sync()
+       // TODO: pack/flush small blocks too, when fragmented
+       for idx, seg := range fn.segments {
+               seg, ok := seg.(*memSegment)
+               if !ok || seg.Len() < maxBlockSize {
+                       continue
+               }
+               locator, _, err := fn.parent.kc.PutB(seg.buf)
+               if err != nil {
+                       // TODO: stall (or return errors from)
+                       // subsequent writes until flushing
+                       // starts to succeed
+                       continue
+               }
+               fn.memsize -= int64(seg.Len())
+               fn.segments[idx] = storedSegment{
+                       kc:      fn.parent.kc,
+                       locator: locator,
+                       size:    seg.Len(),
+                       offset:  0,
+                       length:  seg.Len(),
+               }
+       }
+}
+
+// FileSystem returns a CollectionFileSystem for the collection.
+func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
+       var modTime time.Time
+       if c.ModifiedAt == nil {
+               modTime = time.Now()
        } else {
-               return 0444
+               modTime = *c.ModifiedAt
+       }
+       fs := &fileSystem{dirnode: dirnode{
+               client: client,
+               kc:     kc,
+               fileinfo: fileinfo{
+                       name:    ".",
+                       mode:    os.ModeDir | 0755,
+                       modTime: modTime,
+               },
+               parent: nil,
+               inodes: make(map[string]inode),
+       }}
+       fs.dirnode.parent = &fs.dirnode
+       if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
+               return nil, err
        }
+       return fs, nil
 }
 
-// IsDir implements os.FileInfo.
-func (e collectionDirent) IsDir() bool {
-       return e.isDir
+type filehandle struct {
+       inode
+       ptr        filenodePtr
+       append     bool
+       readable   bool
+       writable   bool
+       unreaddirs []os.FileInfo
 }
 
-// Size implements os.FileInfo.
-func (e collectionDirent) Size() int64 {
-       return e.size
+func (f *filehandle) Read(p []byte) (n int, err error) {
+       if !f.readable {
+               return 0, ErrWriteOnlyMode
+       }
+       f.inode.RLock()
+       defer f.inode.RUnlock()
+       n, f.ptr, err = f.inode.Read(p, f.ptr)
+       return
 }
 
-// Sys implements os.FileInfo.
-func (e collectionDirent) Sys() interface{} {
-       return nil
+func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
+       size := f.inode.Size()
+       ptr := f.ptr
+       switch whence {
+       case io.SeekStart:
+               ptr.off = off
+       case io.SeekCurrent:
+               ptr.off += off
+       case io.SeekEnd:
+               ptr.off = size + off
+       }
+       if ptr.off < 0 {
+               return f.ptr.off, ErrNegativeOffset
+       }
+       if ptr.off != f.ptr.off {
+               f.ptr = ptr
+               // force filenode to recompute f.ptr fields on next
+               // use
+               f.ptr.repacked = -1
+       }
+       return f.ptr.off, nil
 }
 
-// A CollectionFileSystem is an http.Filesystem with an added Stat() method.
-type CollectionFileSystem interface {
-       http.FileSystem
-       Stat(name string) (os.FileInfo, error)
+func (f *filehandle) Truncate(size int64) error {
+       return f.inode.Truncate(size)
 }
 
-// collectionFS implements CollectionFileSystem.
-type collectionFS struct {
-       collection *Collection
-       client     *Client
-       kc         keepClient
-       sizes      map[string]int64
-       sizesOnce  sync.Once
+func (f *filehandle) Write(p []byte) (n int, err error) {
+       if !f.writable {
+               return 0, ErrReadOnlyFile
+       }
+       f.inode.Lock()
+       defer f.inode.Unlock()
+       if fn, ok := f.inode.(*filenode); ok && f.append {
+               f.ptr = filenodePtr{
+                       off:        fn.fileinfo.size,
+                       segmentIdx: len(fn.segments),
+                       segmentOff: 0,
+                       repacked:   fn.repacked,
+               }
+       }
+       n, f.ptr, err = f.inode.Write(p, f.ptr)
+       return
 }
 
-// FileSystem returns a CollectionFileSystem for the collection.
-func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
-       return &collectionFS{
-               collection: c,
-               client:     client,
-               kc:         kc,
-       }
-}
-
-func (c *collectionFS) Stat(name string) (os.FileInfo, error) {
-       name = canonicalName(name)
-       if name == "." {
-               return collectionDirent{
-                       collection: c.collection,
-                       name:       "/",
-                       isDir:      true,
-               }, nil
-       }
-       if size, ok := c.fileSizes()[name]; ok {
-               return collectionDirent{
-                       collection: c.collection,
-                       name:       path.Base(name),
-                       size:       size,
-                       isDir:      false,
-               }, nil
-       }
-       for fnm := range c.fileSizes() {
-               if !strings.HasPrefix(fnm, name+"/") {
+func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
+       if !f.inode.Stat().IsDir() {
+               return nil, ErrInvalidOperation
+       }
+       if count <= 0 {
+               return f.inode.Readdir(), nil
+       }
+       if f.unreaddirs == nil {
+               f.unreaddirs = f.inode.Readdir()
+       }
+       if len(f.unreaddirs) == 0 {
+               return nil, io.EOF
+       }
+       if count > len(f.unreaddirs) {
+               count = len(f.unreaddirs)
+       }
+       ret := f.unreaddirs[:count]
+       f.unreaddirs = f.unreaddirs[count:]
+       return ret, nil
+}
+
+func (f *filehandle) Stat() (os.FileInfo, error) {
+       return f.inode.Stat(), nil
+}
+
+func (f *filehandle) Close() error {
+       return nil
+}
+
+type dirnode struct {
+       fileinfo fileinfo
+       parent   *dirnode
+       client   *Client
+       kc       keepClient
+       inodes   map[string]inode
+       sync.RWMutex
+}
+
+// sync flushes in-memory data (for all files in the tree rooted at
+// dn) to persistent storage. Caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+       type shortBlock struct {
+               fn  *filenode
+               idx int
+       }
+       var pending []shortBlock
+       var pendingLen int
+
+       flush := func(sbs []shortBlock) error {
+               if len(sbs) == 0 {
+                       return nil
+               }
+               block := make([]byte, 0, maxBlockSize)
+               for _, sb := range sbs {
+                       block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
+               }
+               locator, _, err := dn.kc.PutB(block)
+               if err != nil {
+                       return err
+               }
+               off := 0
+               for _, sb := range sbs {
+                       data := sb.fn.segments[sb.idx].(*memSegment).buf
+                       sb.fn.segments[sb.idx] = storedSegment{
+                               kc:      dn.kc,
+                               locator: locator,
+                               size:    len(block),
+                               offset:  off,
+                               length:  len(data),
+                       }
+                       off += len(data)
+                       sb.fn.memsize -= int64(len(data))
+               }
+               return nil
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name := range dn.inodes {
+               names = append(names, name)
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               fn, ok := dn.inodes[name].(*filenode)
+               if !ok {
                        continue
                }
-               return collectionDirent{
-                       collection: c.collection,
-                       name:       path.Base(name),
-                       isDir:      true,
-               }, nil
+               fn.Lock()
+               defer fn.Unlock()
+               for idx, seg := range fn.segments {
+                       seg, ok := seg.(*memSegment)
+                       if !ok {
+                               continue
+                       }
+                       if seg.Len() > maxBlockSize/2 {
+                               if err := flush([]shortBlock{{fn, idx}}); err != nil {
+                                       return err
+                               }
+                               continue
+                       }
+                       if pendingLen+seg.Len() > maxBlockSize {
+                               if err := flush(pending); err != nil {
+                                       return err
+                               }
+                               pending = nil
+                               pendingLen = 0
+                       }
+                       pending = append(pending, shortBlock{fn, idx})
+                       pendingLen += seg.Len()
+               }
        }
-       return nil, os.ErrNotExist
+       return flush(pending)
 }
 
-func (c *collectionFS) Open(name string) (http.File, error) {
-       // Ensure name looks the way it does in a manifest.
-       name = canonicalName(name)
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+       dn.Lock()
+       defer dn.Unlock()
+       return dn.marshalManifest(prefix)
+}
 
-       m := manifest.Manifest{Text: c.collection.ManifestText}
+// caller must have read lock.
+func (dn *dirnode) marshalManifest(prefix string) (string, error) {
+       var streamLen int64
+       type filepart struct {
+               name   string
+               offset int64
+               length int64
+       }
+       var fileparts []filepart
+       var subdirs string
+       var blocks []string
 
-       // Return a file if it exists.
-       if size, ok := c.fileSizes()[name]; ok {
-               reader, err := c.kc.ManifestFileReader(m, name)
-               if err != nil {
-                       return nil, err
+       if err := dn.sync(); err != nil {
+               return "", err
+       }
+
+       names := make([]string, 0, len(dn.inodes))
+       for name, node := range dn.inodes {
+               names = append(names, name)
+               node.Lock()
+               defer node.Unlock()
+       }
+       sort.Strings(names)
+
+       for _, name := range names {
+               switch node := dn.inodes[name].(type) {
+               case *dirnode:
+                       subdir, err := node.marshalManifest(prefix + "/" + name)
+                       if err != nil {
+                               return "", err
+                       }
+                       subdirs = subdirs + subdir
+               case *filenode:
+                       if len(node.segments) == 0 {
+                               fileparts = append(fileparts, filepart{name: name})
+                               break
+                       }
+                       for _, seg := range node.segments {
+                               switch seg := seg.(type) {
+                               case storedSegment:
+                                       if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
+                                               streamLen -= int64(seg.size)
+                                       } else {
+                                               blocks = append(blocks, seg.locator)
+                                       }
+                                       next := filepart{
+                                               name:   name,
+                                               offset: streamLen + int64(seg.offset),
+                                               length: int64(seg.length),
+                                       }
+                                       if prev := len(fileparts) - 1; prev >= 0 &&
+                                               fileparts[prev].name == name &&
+                                               fileparts[prev].offset+fileparts[prev].length == next.offset {
+                                               fileparts[prev].length += next.length
+                                       } else {
+                                               fileparts = append(fileparts, next)
+                                       }
+                                       streamLen += int64(seg.size)
+                               default:
+                                       // This can't happen: we
+                                       // haven't unlocked since
+                                       // calling sync().
+                                       panic(fmt.Sprintf("can't marshal segment type %T", seg))
+                               }
+                       }
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+       var filetokens []string
+       for _, s := range fileparts {
+               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+       }
+       if len(filetokens) == 0 {
+               return subdirs, nil
+       } else if len(blocks) == 0 {
+               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+       }
+       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
+func (dn *dirnode) loadManifest(txt string) error {
+       var dirname string
+       streams := strings.Split(txt, "\n")
+       if streams[len(streams)-1] != "" {
+               return fmt.Errorf("line %d: no trailing newline", len(streams))
+       }
+       streams = streams[:len(streams)-1]
+       segments := []storedSegment{}
+       for i, stream := range streams {
+               lineno := i + 1
+               var anyFileTokens bool
+               var pos int64
+               var segIdx int
+               segments = segments[:0]
+               for i, token := range strings.Split(stream, " ") {
+                       if i == 0 {
+                               dirname = manifestUnescape(token)
+                               continue
+                       }
+                       if !strings.Contains(token, ":") {
+                               if anyFileTokens {
+                                       return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                               }
+                               toks := strings.SplitN(token, "+", 3)
+                               if len(toks) < 2 {
+                                       return fmt.Errorf("line %d: bad locator %q", lineno, token)
+                               }
+                               length, err := strconv.ParseInt(toks[1], 10, 32)
+                               if err != nil || length < 0 {
+                                       return fmt.Errorf("line %d: bad locator %q", lineno, token)
+                               }
+                               segments = append(segments, storedSegment{
+                                       locator: token,
+                                       size:    int(length),
+                                       offset:  0,
+                                       length:  int(length),
+                               })
+                               continue
+                       } else if len(segments) == 0 {
+                               return fmt.Errorf("line %d: bad locator %q", lineno, token)
+                       }
+
+                       toks := strings.Split(token, ":")
+                       if len(toks) != 3 {
+                               return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                       }
+                       anyFileTokens = true
+
+                       offset, err := strconv.ParseInt(toks[0], 10, 64)
+                       if err != nil || offset < 0 {
+                               return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                       }
+                       length, err := strconv.ParseInt(toks[1], 10, 64)
+                       if err != nil || length < 0 {
+                               return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+                       }
+                       name := dirname + "/" + manifestUnescape(toks[2])
+                       fnode, err := dn.createFileAndParents(name)
+                       if err != nil {
+                               return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+                       }
+                       // Map the stream offset/range coordinates to
+                       // block/offset/range coordinates and add
+                       // corresponding storedSegments to the filenode
+                       if pos > offset {
+                               // Can't continue where we left off.
+                               // TODO: binary search instead of
+                               // rewinding all the way (but this
+                               // situation might be rare anyway)
+                               segIdx, pos = 0, 0
+                       }
+                       for next := int64(0); segIdx < len(segments); segIdx++ {
+                               seg := segments[segIdx]
+                               next = pos + int64(seg.Len())
+                               if next <= offset || seg.Len() == 0 {
+                                       pos = next
+                                       continue
+                               }
+                               if pos >= offset+length {
+                                       break
+                               }
+                               var blkOff int
+                               if pos < offset {
+                                       blkOff = int(offset - pos)
+                               }
+                               blkLen := seg.Len() - blkOff
+                               if pos+int64(blkOff+blkLen) > offset+length {
+                                       blkLen = int(offset + length - pos - int64(blkOff))
+                               }
+                               fnode.appendSegment(storedSegment{
+                                       kc:      dn.kc,
+                                       locator: seg.locator,
+                                       size:    seg.size,
+                                       offset:  blkOff,
+                                       length:  blkLen,
+                               })
+                               if next > offset+length {
+                                       break
+                               } else {
+                                       pos = next
+                               }
+                       }
+                       if segIdx == len(segments) && pos < offset+length {
+                               return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
+                       }
+               }
+               if !anyFileTokens {
+                       return fmt.Errorf("line %d: no file segments", lineno)
+               } else if len(segments) == 0 {
+                       return fmt.Errorf("line %d: no locators", lineno)
+               } else if dirname == "" {
+                       return fmt.Errorf("line %d: no stream name", lineno)
+               }
+       }
+       return nil
+}
+
+// only safe to call from loadManifest -- no locking
+func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+       names := strings.Split(path, "/")
+       basename := names[len(names)-1]
+       if basename == "" || basename == "." || basename == ".." {
+               err = fmt.Errorf("invalid filename")
+               return
+       }
+       for _, name := range names[:len(names)-1] {
+               switch name {
+               case "", ".":
+               case "..":
+                       dn = dn.parent
+               default:
+                       switch node := dn.inodes[name].(type) {
+                       case nil:
+                               dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime)
+                       case *dirnode:
+                               dn = node
+                       case *filenode:
+                               err = ErrFileExists
+                               return
+                       }
+               }
+       }
+       switch node := dn.inodes[basename].(type) {
+       case nil:
+               fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime)
+       case *filenode:
+               fn = node
+       case *dirnode:
+               err = ErrIsDirectory
+       }
+       return
+}
+
+func (dn *dirnode) mkdir(name string) (*filehandle, error) {
+       return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
+}
+
+func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
+       f, err := dn.mkdir(name)
+       if err == nil {
+               err = f.Close()
+       }
+       return err
+}
+
+func (dn *dirnode) Remove(name string) error {
+       return dn.remove(strings.TrimRight(name, "/"), false)
+}
+
+func (dn *dirnode) RemoveAll(name string) error {
+       err := dn.remove(strings.TrimRight(name, "/"), true)
+       if os.IsNotExist(err) {
+               // "If the path does not exist, RemoveAll returns
+               // nil." (see "os" pkg)
+               err = nil
+       }
+       return err
+}
+
+func (dn *dirnode) remove(name string, recursive bool) error {
+       dirname, name := path.Split(name)
+       if name == "" || name == "." || name == ".." {
+               return ErrInvalidArgument
+       }
+       dn, ok := dn.lookupPath(dirname).(*dirnode)
+       if !ok {
+               return os.ErrNotExist
+       }
+       dn.Lock()
+       defer dn.Unlock()
+       switch node := dn.inodes[name].(type) {
+       case nil:
+               return os.ErrNotExist
+       case *dirnode:
+               node.RLock()
+               defer node.RUnlock()
+               if !recursive && len(node.inodes) > 0 {
+                       return ErrDirectoryNotEmpty
                }
-               return &collectionFile{
-                       File:       reader,
-                       collection: c.collection,
-                       name:       path.Base(name),
-                       size:       size,
-               }, nil
        }
+       delete(dn.inodes, name)
+       return nil
+}
+
+func (dn *dirnode) Rename(oldname, newname string) error {
+       olddir, oldname := path.Split(oldname)
+       if oldname == "" || oldname == "." || oldname == ".." {
+               return ErrInvalidArgument
+       }
+       olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
+       if err != nil {
+               return fmt.Errorf("%q: %s", olddir, err)
+       }
+       defer olddirf.Close()
+       newdir, newname := path.Split(newname)
+       if newname == "." || newname == ".." {
+               return ErrInvalidArgument
+       } else if newname == "" {
+               // Rename("a/b", "c/") means Rename("a/b", "c/b")
+               newname = oldname
+       }
+       newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
+       if err != nil {
+               return fmt.Errorf("%q: %s", newdir, err)
+       }
+       defer newdirf.Close()
+
+       // When acquiring locks on multiple nodes, all common
+       // ancestors must be locked first in order to avoid
+       // deadlock. This is assured by locking the path from root to
+       // newdir, then locking the path from root to olddir, skipping
+       // any already-locked nodes.
+       needLock := []sync.Locker{}
+       for _, f := range []*filehandle{olddirf, newdirf} {
+               node := f.inode
+               needLock = append(needLock, node)
+               for node.Parent() != node {
+                       node = node.Parent()
+                       needLock = append(needLock, node)
+               }
+       }
+       locked := map[sync.Locker]bool{}
+       for i := len(needLock) - 1; i >= 0; i-- {
+               if n := needLock[i]; !locked[n] {
+                       n.Lock()
+                       defer n.Unlock()
+                       locked[n] = true
+               }
+       }
+
+       olddn := olddirf.inode.(*dirnode)
+       newdn := newdirf.inode.(*dirnode)
+       oldinode, ok := olddn.inodes[oldname]
+       if !ok {
+               return os.ErrNotExist
+       }
+       if existing, ok := newdn.inodes[newname]; ok {
+               // overwriting an existing file or dir
+               if dn, ok := existing.(*dirnode); ok {
+                       if !oldinode.Stat().IsDir() {
+                               return ErrIsDirectory
+                       }
+                       dn.RLock()
+                       defer dn.RUnlock()
+                       if len(dn.inodes) > 0 {
+                               return ErrDirectoryNotEmpty
+                       }
+               }
+       } else {
+               if newdn.inodes == nil {
+                       newdn.inodes = make(map[string]inode)
+               }
+               newdn.fileinfo.size++
+       }
+       newdn.inodes[newname] = oldinode
+       switch n := oldinode.(type) {
+       case *dirnode:
+               n.parent = newdn
+       case *filenode:
+               n.parent = newdn
+       default:
+               panic(fmt.Sprintf("bad inode type %T", n))
+       }
+       delete(olddn.inodes, oldname)
+       olddn.fileinfo.size--
+       return nil
+}
+
+func (dn *dirnode) Parent() inode {
+       dn.RLock()
+       defer dn.RUnlock()
+       return dn.parent
+}
+
+func (dn *dirnode) Readdir() (fi []os.FileInfo) {
+       dn.RLock()
+       defer dn.RUnlock()
+       fi = make([]os.FileInfo, 0, len(dn.inodes))
+       for _, inode := range dn.inodes {
+               fi = append(fi, inode.Stat())
+       }
+       return
+}
 
-       // Return a directory if it's the root dir or there are file
-       // entries below it.
-       children := map[string]collectionDirent{}
-       for fnm, size := range c.fileSizes() {
-               if !strings.HasPrefix(fnm, name+"/") {
+func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+       return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+       return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Size() int64 {
+       dn.RLock()
+       defer dn.RUnlock()
+       return dn.fileinfo.Size()
+}
+
+func (dn *dirnode) Stat() os.FileInfo {
+       dn.RLock()
+       defer dn.RUnlock()
+       return dn.fileinfo
+}
+
+func (dn *dirnode) Truncate(int64) error {
+       return ErrInvalidOperation
+}
+
+// lookupPath returns the inode for the file/directory with the given
+// name (which may contain "/" separators), along with its parent
+// node. If no such file/directory exists, the returned node is nil.
+func (dn *dirnode) lookupPath(path string) (node inode) {
+       node = dn
+       for _, name := range strings.Split(path, "/") {
+               dn, ok := node.(*dirnode)
+               if !ok {
+                       return nil
+               }
+               if name == "." || name == "" {
                        continue
                }
-               isDir := false
-               ent := fnm[len(name)+1:]
-               if i := strings.Index(ent, "/"); i >= 0 {
-                       ent = ent[:i]
-                       isDir = true
+               if name == ".." {
+                       node = node.Parent()
+                       continue
                }
-               e := children[ent]
-               e.collection = c.collection
-               e.isDir = isDir
-               e.name = ent
-               e.size = size
-               children[ent] = e
+               dn.RLock()
+               node = dn.inodes[name]
+               dn.RUnlock()
+       }
+       return
+}
+
+func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode {
+       child := &dirnode{
+               parent: dn,
+               client: dn.client,
+               kc:     dn.kc,
+               fileinfo: fileinfo{
+                       name:    name,
+                       mode:    os.ModeDir | perm,
+                       modTime: modTime,
+               },
+       }
+       if dn.inodes == nil {
+               dn.inodes = make(map[string]inode)
+       }
+       dn.inodes[name] = child
+       dn.fileinfo.size++
+       return child
+}
+
+func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode {
+       child := &filenode{
+               parent: dn,
+               fileinfo: fileinfo{
+                       name:    name,
+                       mode:    perm,
+                       modTime: modTime,
+               },
+       }
+       if dn.inodes == nil {
+               dn.inodes = make(map[string]inode)
        }
-       if len(children) == 0 && name != "." {
+       dn.inodes[name] = child
+       dn.fileinfo.size++
+       return child
+}
+
+// OpenFile is analogous to os.OpenFile().
+func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
+       if flag&os.O_SYNC != 0 {
+               return nil, ErrSyncNotSupported
+       }
+       dirname, name := path.Split(name)
+       dn, ok := dn.lookupPath(dirname).(*dirnode)
+       if !ok {
                return nil, os.ErrNotExist
        }
-       dirents := make([]os.FileInfo, 0, len(children))
-       for _, ent := range children {
-               dirents = append(dirents, ent)
+       var readable, writable bool
+       switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
+       case os.O_RDWR:
+               readable = true
+               writable = true
+       case os.O_RDONLY:
+               readable = true
+       case os.O_WRONLY:
+               writable = true
+       default:
+               return nil, fmt.Errorf("invalid flags 0x%x", flag)
+       }
+       if !writable {
+               // A directory can be opened via "foo/", "foo/.", or
+               // "foo/..".
+               switch name {
+               case ".", "":
+                       return &filehandle{inode: dn}, nil
+               case "..":
+                       return &filehandle{inode: dn.Parent()}, nil
+               }
+       }
+       createMode := flag&os.O_CREATE != 0
+       if createMode {
+               dn.Lock()
+               defer dn.Unlock()
+       } else {
+               dn.RLock()
+               defer dn.RUnlock()
        }
-       return &collectionDir{
-               collection: c.collection,
-               stream:     name,
-               dirents:    dirents,
+       n, ok := dn.inodes[name]
+       if !ok {
+               if !createMode {
+                       return nil, os.ErrNotExist
+               }
+               if perm.IsDir() {
+                       n = dn.newDirnode(name, 0755, time.Now())
+               } else {
+                       n = dn.newFilenode(name, 0755, time.Now())
+               }
+       } else if flag&os.O_EXCL != 0 {
+               return nil, ErrFileExists
+       } else if flag&os.O_TRUNC != 0 {
+               if !writable {
+                       return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
+               } else if fn, ok := n.(*filenode); !ok {
+                       return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
+               } else {
+                       fn.Truncate(0)
+               }
+       }
+       return &filehandle{
+               inode:    n,
+               append:   flag&os.O_APPEND != 0,
+               readable: readable,
+               writable: writable,
        }, nil
 }
 
-// fileSizes returns a map of files that can be opened. Each key
-// starts with "./".
-func (c *collectionFS) fileSizes() map[string]int64 {
-       c.sizesOnce.Do(func() {
-               c.sizes = map[string]int64{}
-               m := manifest.Manifest{Text: c.collection.ManifestText}
-               for ms := range m.StreamIter() {
-                       for _, fss := range ms.FileStreamSegments {
-                               c.sizes[ms.StreamName+"/"+fss.Name] += int64(fss.SegLen)
-                       }
+type segment interface {
+       io.ReaderAt
+       Len() int
+       // Return a new segment with a subsection of the data from this
+       // one. length<0 means length=Len()-off.
+       Slice(off int, length int) segment
+}
+
+type memSegment struct {
+       buf []byte
+}
+
+func (me *memSegment) Len() int {
+       return len(me.buf)
+}
+
+func (me *memSegment) Slice(off, length int) segment {
+       if length < 0 {
+               length = len(me.buf) - off
+       }
+       buf := make([]byte, length)
+       copy(buf, me.buf[off:])
+       return &memSegment{buf: buf}
+}
+
+func (me *memSegment) Truncate(n int) {
+       if n > cap(me.buf) {
+               newsize := 1024
+               for newsize < n {
+                       newsize = newsize << 2
+               }
+               newbuf := make([]byte, n, newsize)
+               copy(newbuf, me.buf)
+               me.buf = newbuf
+       } else {
+               // Zero unused part when shrinking, in case we grow
+               // and start using it again later.
+               for i := n; i < len(me.buf); i++ {
+                       me.buf[i] = 0
+               }
+       }
+       me.buf = me.buf[:n]
+}
+
+func (me *memSegment) WriteAt(p []byte, off int) {
+       if off+len(p) > len(me.buf) {
+               panic("overflowed segment")
+       }
+       copy(me.buf[off:], p)
+}
+
+func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
+       if off > int64(me.Len()) {
+               err = io.EOF
+               return
+       }
+       n = copy(p, me.buf[int(off):])
+       if n < len(p) {
+               err = io.EOF
+       }
+       return
+}
+
+type storedSegment struct {
+       kc      keepClient
+       locator string
+       size    int // size of stored block (also encoded in locator)
+       offset  int // position of segment within the stored block
+       length  int // bytes in this segment (offset + length <= size)
+}
+
+func (se storedSegment) Len() int {
+       return se.length
+}
+
+func (se storedSegment) Slice(n, size int) segment {
+       se.offset += n
+       se.length -= n
+       if size >= 0 && se.length > size {
+               se.length = size
+       }
+       return se
+}
+
+func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
+       if off > int64(se.length) {
+               return 0, io.EOF
+       }
+       maxlen := se.length - int(off)
+       if len(p) > maxlen {
+               p = p[:maxlen]
+               n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
+               if err == nil {
+                       err = io.EOF
                }
-       })
-       return c.sizes
+               return
+       }
+       return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
 }
 
 func canonicalName(name string) string {
@@ -278,3 +1388,31 @@ func canonicalName(name string) string {
        }
        return name
 }
+
+var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
+
+func manifestUnescapeFunc(seq string) string {
+       if seq == `\\` {
+               return `\`
+       }
+       i, err := strconv.ParseUint(seq[1:], 8, 8)
+       if err != nil {
+               // Invalid escape sequence: can't unescape.
+               return seq
+       }
+       return string([]byte{byte(i)})
+}
+
+func manifestUnescape(s string) string {
+       return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
+}
+
+var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
+
+func manifestEscapeFunc(seq string) string {
+       return fmt.Sprintf("\\%03o", byte(seq[0]))
+}
+
+func manifestEscape(s string) string {
+       return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
+}
index f51d1eb3dd1d58d80b278f3859676addb9b0cab5..f1a34754f732bd9d94a2589e72aef164424827b1 100644 (file)
@@ -5,10 +5,20 @@
 package arvados
 
 import (
+       "bytes"
+       "crypto/md5"
+       "errors"
+       "fmt"
        "io"
+       "io/ioutil"
+       "math/rand"
        "net/http"
        "os"
+       "regexp"
+       "runtime"
+       "sync"
        "testing"
+       "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
@@ -16,17 +26,55 @@ import (
 
 var _ = check.Suite(&CollectionFSSuite{})
 
+type keepClientStub struct {
+       blocks map[string][]byte
+       sync.RWMutex
+}
+
+var errStub404 = errors.New("404 block not found")
+
+func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+       kcs.RLock()
+       defer kcs.RUnlock()
+       buf := kcs.blocks[locator[:32]]
+       if buf == nil {
+               return 0, errStub404
+       }
+       return copy(p, buf[off:]), nil
+}
+
+func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
+       locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+       buf := make([]byte, len(p))
+       copy(buf, p)
+       kcs.Lock()
+       defer kcs.Unlock()
+       kcs.blocks[locator[:32]] = buf
+       return locator, 1, nil
+}
+
 type CollectionFSSuite struct {
        client *Client
        coll   Collection
-       fs     http.FileSystem
+       fs     CollectionFileSystem
+       kc     keepClient
 }
 
 func (s *CollectionFSSuite) SetUpTest(c *check.C) {
        s.client = NewClientFromEnv()
        err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
        c.Assert(err, check.IsNil)
-       s.fs = s.coll.FileSystem(s.client, nil)
+       s.kc = &keepClientStub{
+               blocks: map[string][]byte{
+                       "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
+               }}
+       s.fs, err = s.coll.FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
+       _, ok := s.fs.(http.FileSystem)
+       c.Check(ok, check.Equals, true)
 }
 
 func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
@@ -58,7 +106,7 @@ func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
        }
 
        fis, err = f.Readdir(1)
-       c.Check(err, check.Equals, io.EOF)
+       c.Check(err, check.IsNil)
        c.Check(len(fis), check.Equals, 1)
        if len(fis) > 0 {
                c.Check(fis[0].Size(), check.Equals, int64(3))
@@ -76,7 +124,7 @@ func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) {
        c.Assert(err, check.IsNil)
        fis, err = f.Readdir(2)
        c.Check(len(fis), check.Equals, 1)
-       c.Assert(err, check.Equals, io.EOF)
+       c.Assert(err, check.IsNil)
        fis, err = f.Readdir(2)
        c.Check(len(fis), check.Equals, 0)
        c.Assert(err, check.Equals, io.EOF)
@@ -113,14 +161,856 @@ func (s *CollectionFSSuite) TestNotExist(c *check.C) {
        }
 }
 
-func (s *CollectionFSSuite) TestOpenFile(c *check.C) {
-       c.Skip("cannot test files with nil keepclient")
+func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
+       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
+       c.Assert(err, check.IsNil)
+       st, err := f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(st.Size(), check.Equals, int64(3))
+       n, err := f.Write([]byte("bar"))
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, ErrReadOnlyFile)
+}
+
+func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
+       f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
+       c.Assert(err, check.IsNil)
+       st, err := f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(st.Size(), check.Equals, int64(0))
+
+       n, err := f.Write([]byte("bar"))
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+
+       c.Check(f.Close(), check.IsNil)
+
+       f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+       c.Check(f, check.IsNil)
+       c.Assert(err, check.NotNil)
+
+       f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       st, err = f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(st.Size(), check.Equals, int64(3))
+
+       c.Check(f.Close(), check.IsNil)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
+}
+
+func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
 
-       f, err := s.fs.Open("/foo.txt")
+       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
        c.Assert(err, check.IsNil)
+       defer f.Close()
        st, err := f.Stat()
        c.Assert(err, check.IsNil)
        c.Check(st.Size(), check.Equals, int64(3))
+
+       f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f2.Close()
+
+       buf := make([]byte, 64)
+       n, err := f.Read(buf)
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.Equals, io.EOF)
+       c.Check(string(buf[:3]), check.DeepEquals, "foo")
+
+       pos, err := f.Seek(-2, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(1))
+       c.Check(err, check.IsNil)
+
+       // Split a storedExtent in two, and insert a memExtent
+       n, err = f.Write([]byte("*"))
+       c.Check(n, check.Equals, 1)
+       c.Check(err, check.IsNil)
+
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(2))
+       c.Check(err, check.IsNil)
+
+       pos, err = f.Seek(0, io.SeekStart)
+       c.Check(pos, check.Equals, int64(0))
+       c.Check(err, check.IsNil)
+
+       rbuf, err := ioutil.ReadAll(f)
+       c.Check(len(rbuf), check.Equals, 3)
+       c.Check(err, check.IsNil)
+       c.Check(string(rbuf), check.Equals, "f*o")
+
+       // Write multiple blocks in one call
+       f.Seek(1, io.SeekStart)
+       n, err = f.Write([]byte("0123456789abcdefg"))
+       c.Check(n, check.Equals, 17)
+       c.Check(err, check.IsNil)
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(18))
+       pos, err = f.Seek(-18, io.SeekCurrent)
+       c.Check(err, check.IsNil)
+       n, err = io.ReadFull(f, buf)
+       c.Check(n, check.Equals, 18)
+       c.Check(err, check.Equals, io.ErrUnexpectedEOF)
+       c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
+
+       buf2, err := ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+       // truncate to current size
+       err = f.Truncate(18)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+       // shrink to zero some data
+       f.Truncate(15)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcd")
+
+       // grow to partial block/extent
+       f.Truncate(20)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
+
+       f.Truncate(0)
+       f2.Seek(0, io.SeekStart)
+       f2.Write([]byte("12345678abcdefghijkl"))
+
+       // grow to block/extent boundary
+       f.Truncate(64)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(len(buf2), check.Equals, 64)
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
+
+       // shrink to block/extent boundary
+       err = f.Truncate(32)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(len(buf2), check.Equals, 32)
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
+
+       // shrink to partial block/extent
+       err = f.Truncate(15)
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "12345678abcdefg")
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
+
+       // Force flush to ensure the block "12345678" gets stored, so
+       // we know what to expect in the final manifest below.
+       _, err = s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+
+       // Truncate to size=3 while f2's ptr is at 15
+       err = f.Truncate(3)
+       c.Check(err, check.IsNil)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "")
+       f2.Seek(0, io.SeekStart)
+       buf2, err = ioutil.ReadAll(f2)
+       c.Check(err, check.IsNil)
+       c.Check(string(buf2), check.Equals, "123")
+       c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+
+       checkSize := func(size int64) {
+               fi, err := f.Stat()
+               c.Check(fi.Size(), check.Equals, size)
+
+               f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+               c.Assert(err, check.IsNil)
+               defer f.Close()
+               fi, err = f.Stat()
+               c.Check(fi.Size(), check.Equals, size)
+               pos, err := f.Seek(0, io.SeekEnd)
+               c.Check(pos, check.Equals, size)
+       }
+
+       f.Seek(2, io.SeekEnd)
+       checkSize(0)
+       f.Write([]byte{1})
+       checkSize(3)
+
+       f.Seek(2, io.SeekCurrent)
+       checkSize(3)
+       f.Write([]byte{})
+       checkSize(5)
+
+       f.Seek(8, io.SeekStart)
+       checkSize(5)
+       n, err := f.Read(make([]byte, 1))
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+       checkSize(5)
+       f.Write([]byte{1, 2, 3})
+       checkSize(11)
+}
+
+func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       for _, name := range []string{"foo", "bar", "baz"} {
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               f.Write([]byte(name))
+               f.Close()
+       }
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestMkdir(c *check.C) {
+       err := s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       err = s.fs.Mkdir("foo", 0755)
+       c.Check(err, check.IsNil)
+
+       f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
+       c.Check(err, check.IsNil)
+       if err == nil {
+               defer f.Close()
+               f.Write([]byte("foo"))
+       }
+
+       // mkdir fails if a file already exists with that name
+       err = s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.NotNil)
+
+       err = s.fs.Remove("foo/bar")
+       c.Check(err, check.IsNil)
+
+       // mkdir succeds after the file is deleted
+       err = s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.IsNil)
+
+       // creating a file in a nonexistent subdir should still fail
+       f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+       c.Check(err, check.IsNil)
+       if err == nil {
+               defer f.Close()
+               f.Write([]byte("foo"))
+       }
+
+       // creating foo/bar as a regular file should fail
+       f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
+       c.Check(err, check.NotNil)
+
+       // creating foo/bar as a directory should fail
+       f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
+       c.Check(err, check.NotNil)
+       err = s.fs.Mkdir("foo/bar", 0755)
+       c.Check(err, check.NotNil)
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+       c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
+}
+
+func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
+       maxBlockSize = 8
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var wg sync.WaitGroup
+       for n := 0; n < 128; n++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       for i := 0; i < 6502; i++ {
+                               switch rand.Int() & 3 {
+                               case 0:
+                                       f.Truncate(int64(rand.Intn(64)))
+                               case 1:
+                                       f.Seek(int64(rand.Intn(64)), io.SeekStart)
+                               case 2:
+                                       _, err := f.Write([]byte("beep boop"))
+                                       c.Check(err, check.IsNil)
+                               case 3:
+                                       _, err := ioutil.ReadAll(f)
+                                       c.Check(err, check.IsNil)
+                               }
+                       }
+               }()
+       }
+       wg.Wait()
+
+       f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       buf, err := ioutil.ReadAll(f)
+       c.Check(err, check.IsNil)
+       c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
+}
+
+func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
+       maxBlockSize = 40
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       const nfiles = 256
+       const ngoroutines = 256
+
+       var wg sync.WaitGroup
+       for n := 0; n < nfiles; n++ {
+               wg.Add(1)
+               go func(n int) {
+                       defer wg.Done()
+                       expect := make([]byte, 0, 64)
+                       wbytes := []byte("there's no simple explanation for anything important that any of us do")
+                       f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+                       c.Assert(err, check.IsNil)
+                       defer f.Close()
+                       for i := 0; i < ngoroutines; i++ {
+                               trunc := rand.Intn(65)
+                               woff := rand.Intn(trunc + 1)
+                               wbytes = wbytes[:rand.Intn(64-woff+1)]
+                               for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
+                                       buf[i] = 0
+                               }
+                               expect = expect[:trunc]
+                               if trunc < woff+len(wbytes) {
+                                       expect = expect[:woff+len(wbytes)]
+                               }
+                               copy(expect[woff:], wbytes)
+                               f.Truncate(int64(trunc))
+                               pos, err := f.Seek(int64(woff), io.SeekStart)
+                               c.Check(pos, check.Equals, int64(woff))
+                               c.Check(err, check.IsNil)
+                               n, err := f.Write(wbytes)
+                               c.Check(n, check.Equals, len(wbytes))
+                               c.Check(err, check.IsNil)
+                               pos, err = f.Seek(0, io.SeekStart)
+                               c.Check(pos, check.Equals, int64(0))
+                               c.Check(err, check.IsNil)
+                               buf, err := ioutil.ReadAll(f)
+                               c.Check(string(buf), check.Equals, string(expect))
+                               c.Check(err, check.IsNil)
+                       }
+                       s.checkMemSize(c, f)
+               }(n)
+       }
+       wg.Wait()
+
+       root, err := s.fs.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err := root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, nfiles)
+
+       _, err = s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       // TODO: check manifest content
+}
+
+func (s *CollectionFSSuite) TestRemove(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir0", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir1", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir1/dir2", 0755)
+       c.Assert(err, check.IsNil)
+       err = fs.Mkdir("dir1/dir3", 0755)
+       c.Assert(err, check.IsNil)
+
+       err = fs.Remove("dir0")
+       c.Check(err, check.IsNil)
+       err = fs.Remove("dir0")
+       c.Check(err, check.Equals, os.ErrNotExist)
+
+       err = fs.Remove("dir1/dir2/.")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Remove("dir1/dir2/..")
+       c.Check(err, check.Equals, ErrInvalidArgument)
+       err = fs.Remove("dir1")
+       c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+       err = fs.Remove("dir1/dir2/../../../dir1")
+       c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+       err = fs.Remove("dir1/dir3/")
+       c.Check(err, check.IsNil)
+       err = fs.RemoveAll("dir1")
+       c.Check(err, check.IsNil)
+       err = fs.RemoveAll("dir1")
+       c.Check(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestRename(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       const (
+               outer = 16
+               inner = 16
+       )
+       for i := 0; i < outer; i++ {
+               err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
+               c.Assert(err, check.IsNil)
+               for j := 0; j < inner; j++ {
+                       err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
+                       c.Assert(err, check.IsNil)
+                       for _, fnm := range []string{
+                               fmt.Sprintf("dir%d/file%d", i, j),
+                               fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+                       } {
+                               f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
+                               c.Assert(err, check.IsNil)
+                               _, err = f.Write([]byte("beep"))
+                               c.Assert(err, check.IsNil)
+                               f.Close()
+                       }
+               }
+       }
+       var wg sync.WaitGroup
+       for i := 0; i < outer; i++ {
+               for j := 0; j < inner; j++ {
+                       wg.Add(1)
+                       go func(i, j int) {
+                               defer wg.Done()
+                               oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
+                               newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
+                               _, err := fs.Open(newname)
+                               c.Check(err, check.Equals, os.ErrNotExist)
+                               err = fs.Rename(oldname, newname)
+                               c.Check(err, check.IsNil)
+                               f, err := fs.Open(newname)
+                               c.Check(err, check.IsNil)
+                               f.Close()
+                       }(i, j)
+
+                       wg.Add(1)
+                       go func(i, j int) {
+                               defer wg.Done()
+                               // oldname does not exist
+                               err := fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d/missing", i, j),
+                                       fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // newname parent dir does not exist
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d", i, j),
+                                       fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // oldname parent dir is a file
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/file%d/patherror", i, j),
+                                       fmt.Sprintf("dir%d/irrelevant", i))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+                               // newname parent dir is a file
+                               err = fs.Rename(
+                                       fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+                                       fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
+                               c.Check(err, check.ErrorMatches, `.*does not exist`)
+                       }(i, j)
+               }
+       }
+       wg.Wait()
+
+       f, err := fs.OpenFile("dir1/newfile3", 0, 0)
+       c.Assert(err, check.IsNil)
+       c.Check(f.Size(), check.Equals, int64(4))
+       buf, err := ioutil.ReadAll(f)
+       c.Check(buf, check.DeepEquals, []byte("beep"))
+       c.Check(err, check.IsNil)
+       _, err = fs.Open("dir1/dir1/file1")
+       c.Check(err, check.Equals, os.ErrNotExist)
+}
+
+func (s *CollectionFSSuite) TestPersist(c *check.C) {
+       maxBlockSize = 1024
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       err = s.fs.Mkdir("d:r", 0755)
+       c.Assert(err, check.IsNil)
+
+       expect := map[string][]byte{}
+
+       var wg sync.WaitGroup
+       for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
+               buf := make([]byte, 500)
+               rand.Read(buf)
+               expect[name] = buf
+
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               // Note: we don't close the file until after the test
+               // is done. Writes to unclosed files should persist.
+               defer f.Close()
+
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       for i := 0; i < len(buf); i += 5 {
+                               _, err := f.Write(buf[i : i+5])
+                               c.Assert(err, check.IsNil)
+                       }
+               }()
+       }
+       wg.Wait()
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       c.Logf("%q", m)
+
+       root, err := s.fs.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err := root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, 4)
+
+       persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       root, err = persisted.Open("/")
+       c.Assert(err, check.IsNil)
+       defer root.Close()
+       fi, err = root.Readdir(-1)
+       c.Check(err, check.IsNil)
+       c.Check(len(fi), check.Equals, 4)
+
+       for name, content := range expect {
+               c.Logf("read %q", name)
+               f, err := persisted.Open(name)
+               c.Assert(err, check.IsNil)
+               defer f.Close()
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, check.IsNil)
+               c.Check(buf, check.DeepEquals, content)
+       }
+}
+
+func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+       var err error
+       s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+               err = s.fs.Mkdir(name, 0755)
+               c.Assert(err, check.IsNil)
+       }
+
+       expect := map[string][]byte{
+               "0":                nil,
+               "00":               []byte{},
+               "one":              []byte{1},
+               "dir/0":            nil,
+               "dir/two":          []byte{1, 2},
+               "dir/zero":         nil,
+               "dir/zerodir/zero": nil,
+               "zero/zero/zero":   nil,
+       }
+       for name, data := range expect {
+               f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+               c.Assert(err, check.IsNil)
+               if data != nil {
+                       _, err := f.Write(data)
+                       c.Assert(err, check.IsNil)
+               }
+               f.Close()
+       }
+
+       m, err := s.fs.MarshalManifest(".")
+       c.Check(err, check.IsNil)
+       c.Logf("%q", m)
+
+       persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       for name, data := range expect {
+               f, err := persisted.Open("bogus-" + name)
+               c.Check(err, check.NotNil)
+
+               f, err = persisted.Open(name)
+               c.Assert(err, check.IsNil)
+
+               if data == nil {
+                       data = []byte{}
+               }
+               buf, err := ioutil.ReadAll(f)
+               c.Check(err, check.IsNil)
+               c.Check(buf, check.DeepEquals, data)
+       }
+}
+
+func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+
+       f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
+       c.Check(f, check.IsNil)
+       c.Check(err, check.ErrorMatches, `file does not exist`)
+
+       f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       n, err := f.Write([]byte{1, 2, 3})
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.ErrorMatches, `read-only file`)
+       n, err = f.Read(make([]byte, 1))
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.Equals, io.EOF)
+       f, err = fs.OpenFile("new", os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       _, err = f.Write([]byte{4, 5, 6})
+       c.Check(err, check.IsNil)
+       fi, err := f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(fi.Size(), check.Equals, int64(3))
+
+       f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+       pos, err := f.Seek(0, io.SeekEnd)
+       c.Check(pos, check.Equals, int64(0))
+       c.Check(err, check.IsNil)
+       fi, err = f.Stat()
+       c.Assert(err, check.IsNil)
+       c.Check(fi.Size(), check.Equals, int64(0))
+       fs.Remove("new")
+
+       buf := make([]byte, 64)
+       f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
+       c.Assert(err, check.IsNil)
+       f.Write([]byte{1, 2, 3})
+       f.Seek(0, io.SeekStart)
+       n, _ = f.Read(buf[:1])
+       c.Check(n, check.Equals, 1)
+       c.Check(buf[:1], check.DeepEquals, []byte{1})
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(1))
+       f.Write([]byte{4, 5, 6})
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(6))
+       f.Seek(0, io.SeekStart)
+       n, err = f.Read(buf)
+       c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
+       c.Check(err, check.Equals, io.EOF)
+       f.Close()
+
+       f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
+       c.Assert(err, check.IsNil)
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(0))
+       c.Check(err, check.IsNil)
+       f.Read(buf[:3])
+       pos, _ = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(3))
+       f.Write([]byte{7, 8, 9})
+       pos, err = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(9))
+       f.Close()
+
+       f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
+       c.Assert(err, check.IsNil)
+       n, err = f.Write([]byte{3, 2, 1})
+       c.Check(n, check.Equals, 3)
+       c.Check(err, check.IsNil)
+       pos, _ = f.Seek(0, io.SeekCurrent)
+       c.Check(pos, check.Equals, int64(3))
+       pos, _ = f.Seek(0, io.SeekStart)
+       c.Check(pos, check.Equals, int64(0))
+       n, err = f.Read(buf)
+       c.Check(n, check.Equals, 0)
+       c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
+       f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
+       c.Assert(err, check.IsNil)
+       n, _ = f.Read(buf)
+       c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
+
+       f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
+       c.Check(f, check.IsNil)
+       c.Check(err, check.NotNil)
+
+       f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
+       c.Check(f, check.IsNil)
+       c.Check(err, check.ErrorMatches, `invalid flag.*`)
+}
+
+func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+       maxBlockSize = 1024
+       defer func() { maxBlockSize = 2 << 26 }()
+
+       fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+       c.Assert(err, check.IsNil)
+       f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
+       c.Assert(err, check.IsNil)
+       defer f.Close()
+
+       data := make([]byte, 500)
+       rand.Read(data)
+
+       for i := 0; i < 100; i++ {
+               n, err := f.Write(data)
+               c.Assert(n, check.Equals, len(data))
+               c.Assert(err, check.IsNil)
+       }
+
+       currentMemExtents := func() (memExtents []int) {
+               for idx, e := range f.(*filehandle).inode.(*filenode).segments {
+                       switch e.(type) {
+                       case *memSegment:
+                               memExtents = append(memExtents, idx)
+                       }
+               }
+               return
+       }
+       c.Check(currentMemExtents(), check.HasLen, 1)
+
+       m, err := fs.MarshalManifest(".")
+       c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
+       c.Check(err, check.IsNil)
+       c.Check(currentMemExtents(), check.HasLen, 0)
+}
+
+func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
+       for _, txt := range []string{
+               "\n",
+               ".\n",
+               ". \n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
+               ". 0:0:foo\n",
+               ".  0:0:foo\n",
+               ". 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
+               "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
+       } {
+               c.Logf("<-%q", txt)
+               fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+               c.Check(fs, check.IsNil)
+               c.Logf("-> %s", err)
+               c.Check(err, check.NotNil)
+       }
+}
+
+func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
+       for _, txt := range []string{
+               "",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+               ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
+       } {
+               c.Logf("<-%q", txt)
+               fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+               c.Check(err, check.IsNil)
+               c.Check(fs, check.NotNil)
+       }
+}
+
+func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
+       fn := f.(*filehandle).inode.(*filenode)
+       var memsize int64
+       for _, seg := range fn.segments {
+               if e, ok := seg.(*memSegment); ok {
+                       memsize += int64(len(e.buf))
+               }
+       }
+       c.Check(fn.memsize, check.Equals, memsize)
+}
+
+type CollectionFSUnitSuite struct{}
+
+var _ = check.Suite(&CollectionFSUnitSuite{})
+
+// expect ~2 seconds to load a manifest with 256K files
+func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
+       const (
+               dirCount  = 512
+               fileCount = 512
+       )
+
+       mb := bytes.NewBuffer(make([]byte, 0, 40000000))
+       for i := 0; i < dirCount; i++ {
+               fmt.Fprintf(mb, "./dir%d", i)
+               for j := 0; j <= fileCount; j++ {
+                       fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
+               }
+               for j := 0; j < fileCount; j++ {
+                       fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
+               }
+               mb.Write([]byte{'\n'})
+       }
+       coll := Collection{ManifestText: mb.String()}
+       c.Logf("%s built", time.Now())
+
+       var memstats runtime.MemStats
+       runtime.ReadMemStats(&memstats)
+       c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
+
+       f, err := coll.FileSystem(nil, nil)
+       c.Check(err, check.IsNil)
+       c.Logf("%s loaded", time.Now())
+
+       for i := 0; i < dirCount; i++ {
+               for j := 0; j < fileCount; j++ {
+                       f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
+               }
+       }
+       c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
+
+       runtime.ReadMemStats(&memstats)
+       c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
 }
 
 // Gocheck boilerplate
index 773a2e6f9c7d787406511f85a6a5585596153738..9a04855784a76eee88bb4119b1483fa3505b67f2 100644 (file)
@@ -17,7 +17,7 @@ type TransactionError struct {
        URL        url.URL
        StatusCode int
        Status     string
-       errors     []string
+       Errors     []string
 }
 
 func (e TransactionError) Error() (s string) {
@@ -25,8 +25,8 @@ func (e TransactionError) Error() (s string) {
        if e.Status != "" {
                s = s + ": " + e.Status
        }
-       if len(e.errors) > 0 {
-               s = s + ": " + strings.Join(e.errors, "; ")
+       if len(e.Errors) > 0 {
+               s = s + ": " + strings.Join(e.Errors, "; ")
        }
        return
 }
@@ -35,7 +35,7 @@ func newTransactionError(req *http.Request, resp *http.Response, buf []byte) *Tr
        var e TransactionError
        if json.Unmarshal(buf, &e) != nil {
                // No JSON-formatted error response
-               e.errors = nil
+               e.Errors = nil
        }
        e.Method = req.Method
        e.URL = *req.URL
index 18fd91f560227ee1c40fec2bce64126bc0c3b827..d2c3a41f2108e2bc852f56119b747a7ec9423e7a 100644 (file)
@@ -5,6 +5,8 @@
 package httpserver
 
 import (
+       "math/rand"
+       "net/http"
        "strconv"
        "sync"
        "time"
@@ -17,19 +19,34 @@ type IDGenerator struct {
        // Prefix is prepended to each returned ID.
        Prefix string
 
-       lastID int64
-       mtx    sync.Mutex
+       mtx sync.Mutex
+       src rand.Source
 }
 
 // Next returns a new ID string. It is safe to call Next from multiple
 // goroutines.
 func (g *IDGenerator) Next() string {
-       id := time.Now().UnixNano()
        g.mtx.Lock()
-       if id <= g.lastID {
-               id = g.lastID + 1
+       defer g.mtx.Unlock()
+       if g.src == nil {
+               g.src = rand.NewSource(time.Now().UnixNano())
        }
-       g.lastID = id
-       g.mtx.Unlock()
-       return g.Prefix + strconv.FormatInt(id, 36)
+       a, b := g.src.Int63(), g.src.Int63()
+       id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+       for len(id) > 20 {
+               id = id[:20]
+       }
+       return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+       gen := &IDGenerator{Prefix: "req-"}
+       return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               if req.Header.Get("X-Request-Id") == "" {
+                       req.Header.Set("X-Request-Id", gen.Next())
+               }
+               h.ServeHTTP(w, req)
+       })
 }
diff --git a/sdk/go/httpserver/logger.go b/sdk/go/httpserver/logger.go
new file mode 100644 (file)
index 0000000..decb2ff
--- /dev/null
@@ -0,0 +1,82 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "context"
+       "net/http"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/stats"
+       log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+       name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+       return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+               w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+               req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+               lgr := log.WithFields(log.Fields{
+                       "RequestID":       req.Header.Get("X-Request-Id"),
+                       "remoteAddr":      req.RemoteAddr,
+                       "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+                       "reqMethod":       req.Method,
+                       "reqPath":         req.URL.Path[1:],
+                       "reqBytes":        req.ContentLength,
+               })
+               logRequest(w, req, lgr)
+               defer logResponse(w, req, lgr)
+               h.ServeHTTP(w, req)
+       })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+       if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+               tDone := time.Now()
+               lgr = lgr.WithFields(log.Fields{
+                       "timeTotal":     stats.Duration(tDone.Sub(tStart)),
+                       "timeToStatus":  stats.Duration(w.writeTime.Sub(tStart)),
+                       "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+               })
+       }
+       lgr.WithFields(log.Fields{
+               "respStatusCode": w.WroteStatus(),
+               "respStatus":     http.StatusText(w.WroteStatus()),
+               "respBytes":      w.WroteBodyBytes(),
+       }).Info("response")
+}
+
+type responseTimer struct {
+       ResponseWriter
+       wrote     bool
+       writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+       if !rt.wrote {
+               rt.wrote = true
+               rt.writeTime = time.Now()
+       }
+       return rt.ResponseWriter.Write(p)
+}
diff --git a/sdk/go/httpserver/logger_test.go b/sdk/go/httpserver/logger_test.go
new file mode 100644 (file)
index 0000000..bbcafa1
--- /dev/null
@@ -0,0 +1,68 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+       "bytes"
+       "encoding/json"
+       "net/http"
+       "net/http/httptest"
+       "os"
+       "testing"
+       "time"
+
+       log "github.com/Sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+       defer log.SetOutput(os.Stdout)
+       captured := &bytes.Buffer{}
+       log.SetOutput(captured)
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: time.RFC3339Nano,
+       })
+       h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+               w.Write([]byte("hello world"))
+       })
+       req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+       req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+       c.Assert(err, check.IsNil)
+       resp := httptest.NewRecorder()
+       AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+       dec := json.NewDecoder(captured)
+
+       gotReq := make(map[string]interface{})
+       err = dec.Decode(&gotReq)
+       c.Logf("%#v", gotReq)
+       c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+       c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotReq["msg"], check.Equals, "request")
+
+       gotResp := make(map[string]interface{})
+       err = dec.Decode(&gotResp)
+       c.Logf("%#v", gotResp)
+       c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+       c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+       c.Check(gotResp["msg"], check.Equals, "response")
+
+       c.Assert(gotResp["time"], check.FitsTypeOf, "")
+       _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+       c.Check(err, check.IsNil)
+
+       for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+               c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+               c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+       }
+}
index e6ac4ca7a8be42d996e32c7fbc647389cc5248a4..f17bc820a98f5c242ffc24122d1176cc5c21a57e 100644 (file)
@@ -8,40 +8,46 @@ import (
        "net/http"
 )
 
-// ResponseWriter wraps http.ResponseWriter and exposes the status
+type ResponseWriter interface {
+       http.ResponseWriter
+       WroteStatus() int
+       WroteBodyBytes() int
+}
+
+// responseWriter wraps http.ResponseWriter and exposes the status
 // sent, the number of bytes sent to the client, and the last write
 // error.
-type ResponseWriter struct {
+type responseWriter struct {
        http.ResponseWriter
-       wroteStatus    *int   // Last status given to WriteHeader()
-       wroteBodyBytes *int   // Bytes successfully written
-       err            *error // Last error returned from Write()
+       wroteStatus    int   // Last status given to WriteHeader()
+       wroteBodyBytes int   // Bytes successfully written
+       err            error // Last error returned from Write()
 }
 
 func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter {
-       return ResponseWriter{orig, new(int), new(int), new(error)}
+       return &responseWriter{ResponseWriter: orig}
 }
 
-func (w ResponseWriter) WriteHeader(s int) {
-       *w.wroteStatus = s
+func (w *responseWriter) WriteHeader(s int) {
+       w.wroteStatus = s
        w.ResponseWriter.WriteHeader(s)
 }
 
-func (w ResponseWriter) Write(data []byte) (n int, err error) {
+func (w *responseWriter) Write(data []byte) (n int, err error) {
        n, err = w.ResponseWriter.Write(data)
-       *w.wroteBodyBytes += n
-       *w.err = err
+       w.wroteBodyBytes += n
+       w.err = err
        return
 }
 
-func (w ResponseWriter) WroteStatus() int {
-       return *w.wroteStatus
+func (w *responseWriter) WroteStatus() int {
+       return w.wroteStatus
 }
 
-func (w ResponseWriter) WroteBodyBytes() int {
-       return *w.wroteBodyBytes
+func (w *responseWriter) WroteBodyBytes() int {
+       return w.wroteBodyBytes
 }
 
-func (w ResponseWriter) Err() error {
-       return *w.err
+func (w *responseWriter) Err() error {
+       return w.err
 }
index e841a00fa1e1f3493bff67890b2aee70181a3bca..bac4a24fd5a037d9cdcafb663612677304184c97 100644 (file)
@@ -7,6 +7,8 @@ package keepclient
 import (
        "io"
        "sort"
+       "strconv"
+       "strings"
        "sync"
        "time"
 )
@@ -49,10 +51,30 @@ func (c *BlockCache) Sweep() {
        }
 }
 
+// ReadAt returns data from the cache, first retrieving it from Keep if
+// necessary.
+func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
+       buf, err := c.Get(kc, locator)
+       if err != nil {
+               return 0, err
+       }
+       if off > len(buf) {
+               return 0, io.ErrUnexpectedEOF
+       }
+       return copy(p, buf[off:]), nil
+}
+
 // Get returns data from the cache, first retrieving it from Keep if
 // necessary.
 func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
        cacheKey := locator[:32]
+       bufsize := BLOCKSIZE
+       if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
+               datasize, err := strconv.ParseInt(parts[1], 10, 32)
+               if err == nil && datasize >= 0 {
+                       bufsize = int(datasize)
+               }
+       }
        c.mtx.Lock()
        if c.cache == nil {
                c.cache = make(map[string]*cacheBlock)
@@ -68,7 +90,7 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
                        rdr, size, _, err := kc.Get(locator)
                        var data []byte
                        if err == nil {
-                               data = make([]byte, size, BLOCKSIZE)
+                               data = make([]byte, size, bufsize)
                                _, err = io.ReadFull(rdr, data)
                                err2 := rdr.Close()
                                if err == nil {
index 57829aadebb0f3c4aff32c604db08e6e481a1a3b..fa309f65535c270ace4b4d0e2219092b7e49b5e1 100644 (file)
@@ -6,25 +6,12 @@ package keepclient
 
 import (
        "errors"
-       "fmt"
-       "io"
        "os"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/manifest"
 )
 
-const (
-       // After reading a data block from Keep, cfReader slices it up
-       // and sends the slices to a buffered channel to be consumed
-       // by the caller via Read().
-       //
-       // dataSliceSize is the maximum size of the slices, and
-       // therefore the maximum number of bytes that will be returned
-       // by a single call to Read().
-       dataSliceSize = 1 << 20
-)
-
 // ErrNoManifest indicates the given collection has no manifest
 // information (e.g., manifest_text was excluded by a "select"
 // parameter when retrieving the collection record).
@@ -38,141 +25,17 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi
        if !ok {
                return nil, ErrNoManifest
        }
-       m := manifest.Manifest{Text: mText}
-       return kc.ManifestFileReader(m, filename)
-}
-
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
-       f := &file{
-               kc: kc,
-       }
-       err := f.load(m, filename)
+       fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc)
        if err != nil {
                return nil, err
        }
-       return f, nil
-}
-
-type file struct {
-       kc       *KeepClient
-       segments []*manifest.FileSegment
-       size     int64 // total file size
-       offset   int64 // current read offset
-
-       // current/latest segment accessed -- might or might not match pos
-       seg           *manifest.FileSegment
-       segStart      int64 // position of segment relative to file
-       segData       []byte
-       segNext       []*manifest.FileSegment
-       readaheadDone bool
-}
-
-// Close implements io.Closer.
-func (f *file) Close() error {
-       f.kc = nil
-       f.segments = nil
-       f.segData = nil
-       return nil
+       return fs.OpenFile(filename, os.O_RDONLY, 0)
 }
 
-// Read implements io.Reader.
-func (f *file) Read(buf []byte) (int, error) {
-       if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
-               // f.seg does not cover the current read offset
-               // (f.pos).  Iterate over f.segments to find the one
-               // that does.
-               f.seg = nil
-               f.segStart = 0
-               f.segData = nil
-               f.segNext = f.segments
-               for len(f.segNext) > 0 {
-                       seg := f.segNext[0]
-                       f.segNext = f.segNext[1:]
-                       segEnd := f.segStart + int64(seg.Len)
-                       if segEnd > f.offset {
-                               f.seg = seg
-                               break
-                       }
-                       f.segStart = segEnd
-               }
-               f.readaheadDone = false
-       }
-       if f.seg == nil {
-               return 0, io.EOF
-       }
-       if f.segData == nil {
-               data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
-               if err != nil {
-                       return 0, err
-               }
-               if len(data) < f.seg.Offset+f.seg.Len {
-                       return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
-               }
-               f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
-       }
-       // dataOff and dataLen denote a portion of f.segData
-       // corresponding to a portion of the file at f.offset.
-       dataOff := int(f.offset - f.segStart)
-       dataLen := f.seg.Len - dataOff
-
-       if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
-               // If we have already read more than just the first
-               // few bytes of this file, and we have already
-               // consumed a noticeable portion of this segment, and
-               // there's more data for this file in the next segment
-               // ... then there's a good chance we are going to need
-               // the data for that next segment soon. Start getting
-               // it into the cache now.
-               go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
-               f.readaheadDone = true
-       }
-
-       n := len(buf)
-       if n > dataLen {
-               n = dataLen
-       }
-       copy(buf[:n], f.segData[dataOff:dataOff+n])
-       f.offset += int64(n)
-       return n, nil
-}
-
-// Seek implements io.Seeker.
-func (f *file) Seek(offset int64, whence int) (int64, error) {
-       var want int64
-       switch whence {
-       case io.SeekStart:
-               want = offset
-       case io.SeekCurrent:
-               want = f.offset + offset
-       case io.SeekEnd:
-               want = f.size + offset
-       default:
-               return f.offset, fmt.Errorf("invalid whence %d", whence)
-       }
-       if want < 0 {
-               return f.offset, fmt.Errorf("attempted seek to %d", want)
-       }
-       if want > f.size {
-               want = f.size
-       }
-       f.offset = want
-       return f.offset, nil
-}
-
-// Size returns the file size in bytes.
-func (f *file) Size() int64 {
-       return f.size
-}
-
-func (f *file) load(m manifest.Manifest, path string) error {
-       f.segments = nil
-       f.size = 0
-       for seg := range m.FileSegmentIterByName(path) {
-               f.segments = append(f.segments, seg)
-               f.size += int64(seg.Len)
-       }
-       if f.segments == nil {
-               return os.ErrNotExist
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+       fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc)
+       if err != nil {
+               return nil, err
        }
-       return nil
+       return fs.OpenFile(filename, os.O_RDONLY, 0)
 }
index df8bcb39dce2eaedf0a03c411bc4043ef8f0fe29..5d1e2a15332f31eaf5bd4f03bdd1343d746ada4f 100644 (file)
@@ -166,11 +166,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) {
 
 func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
        h := md5.New()
-       var testdata []byte
        buf := make([]byte, 4096)
        locs := make([]string, len(buf))
+       testdata := make([]byte, 0, len(buf)*len(buf))
        filesize := 0
-       for i := 0; i < len(locs); i++ {
+       for i := range locs {
                _, err := rand.Read(buf[:i])
                h.Write(buf[:i])
                locs[i], _, err = s.kc.PutB(buf[:i])
@@ -219,11 +219,12 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
        c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
        c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
 
+       expectPos := curPos + size + 12345
        curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
        c.Check(err, check.IsNil)
-       c.Check(curPos, check.Equals, size)
+       c.Check(curPos, check.Equals, expectPos)
 
-       curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+       curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
        c.Check(err, check.IsNil)
        c.Check(curPos, check.Equals, int64(8))
 
index 37d651e31fbd971defa3217d6c11e883c4a073cc..54a4a374b991b44c5a5e51878be980a1b78f9609 100644 (file)
@@ -293,6 +293,12 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error)
        return kc.getOrHead("GET", locator)
 }
 
+// ReadAt() retrieves a portion of block from the cache if it's
+// present, otherwise from the network.
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
+       return kc.cache().ReadAt(kc, locator, p, off)
+}
+
 // Ask() verifies that a block with the given hash is available and
 // readable, according to at least one Keep service. Unlike Get, it
 // does not retrieve the data or verify that the data content matches
index 83765fb1dc5571b4454ff8143a1545321f34c91c..466adc0eefb573a08b18dff2d5280102ef448210 100644 (file)
@@ -31,6 +31,7 @@ class Container < ArvadosModel
   after_validation :assign_auth
   before_save :sort_serialized_attrs
   after_save :handle_completed
+  after_save :propagate_priority
 
   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
   belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
@@ -93,6 +94,20 @@ class Container < ArvadosModel
     end
   end
 
+  def propagate_priority
+    if self.priority_changed?
+      act_as_system_user do
+         # Update the priority of child container requests to match new priority
+         # of the parent container.
+         ContainerRequest.where(requesting_container_uuid: self.uuid,
+                                state: ContainerRequest::Committed).each do |cr|
+           cr.priority = self.priority
+           cr.save
+         end
+       end
+    end
+  end
+
   # Create a new container (or find an existing one) to satisfy the
   # given container request.
   def self.resolve(req)
index e751d6158cdc2eb1b26fd3d80fa277839d7ad590..cecf7b818e7004f74fb4544033d1a6e636e55069 100644 (file)
@@ -293,6 +293,35 @@ class ContainerRequestTest < ActiveSupport::TestCase
     assert_equal 0, c2.priority
   end
 
+
+  test "Container makes container request, then changes priority" do
+    set_user_from_auth :active
+    cr = create_minimal_req!(priority: 5, state: "Committed", container_count_max: 1)
+
+    c = Container.find_by_uuid cr.container_uuid
+    assert_equal 5, c.priority
+
+    cr2 = create_minimal_req!
+    cr2.update_attributes!(priority: 5, state: "Committed", requesting_container_uuid: c.uuid, command: ["echo", "foo2"], container_count_max: 1)
+    cr2.reload
+
+    c2 = Container.find_by_uuid cr2.container_uuid
+    assert_equal 5, c2.priority
+
+    act_as_system_user do
+      c.priority = 10
+      c.save!
+    end
+
+    cr.reload
+
+    cr2.reload
+    assert_equal 10, cr2.priority
+
+    c2.reload
+    assert_equal 10, c2.priority
+  end
+
   [
     ['running_container_auth', 'zzzzz-dz642-runningcontainr'],
     ['active_no_prefs', nil],
index b7373b5c1ec5d1c5d5c94b3c135825f7a43a24ad..617c73282f633ac6ddbca83c2094c1acfe8f3f18 100644 (file)
@@ -188,5 +188,5 @@ func (h *authHandler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        r.URL.Path = rewrittenPath
 
-       h.handler.ServeHTTP(&w, r)
+       h.handler.ServeHTTP(w, r)
 }
index 79a3eb3f7b85de9667c04fe1b24f2a5217db5772..74ac7ae55eea05aa396f9ff337a8dbed74505079 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "encoding/json"
        "flag"
+       "fmt"
        "log"
        "os"
        "regexp"
@@ -16,6 +17,8 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
+var version = "dev"
+
 // Server configuration
 type Config struct {
        Client          arvados.Client
@@ -50,6 +53,7 @@ func main() {
 
        cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
        dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+       getVersion := flag.Bool("version", false, "print version information and exit.")
 
        flag.StringVar(&theConfig.ManagementToken, "management-token", theConfig.ManagementToken,
                "Authorization token to be included in all health check requests.")
@@ -57,6 +61,12 @@ func main() {
        flag.Usage = usage
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("arv-git-httpd %s\n", version)
+               return
+       }
+
        err := config.LoadFile(theConfig, *cfgPath)
        if err != nil {
                h := os.Getenv("ARVADOS_API_HOST")
@@ -84,6 +94,7 @@ func main() {
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
+       log.Printf("arv-git-httpd %s started", version)
        log.Println("Listening at", srv.Addr)
        log.Println("Repository root", theConfig.RepoRoot)
        if err := srv.Wait(); err != nil {
index 888a2148c1797aa330167d6ca4b44c6a914e5000..279327ba18811ba8ad6339600cc124460f2fc35c 100644 (file)
@@ -9,6 +9,7 @@ package main
 import (
        "context"
        "flag"
+       "fmt"
        "log"
        "os"
        "os/exec"
@@ -22,6 +23,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
 )
 
+var version = "dev"
+
 func main() {
        err := doMain()
        if err != nil {
@@ -49,9 +52,22 @@ func doMain() error {
                "/usr/bin/crunch-run",
                "Crunch command to run container")
 
+       getVersion := flags.Bool(
+               "version",
+               false,
+               "Print version information and exit.")
+
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunch-dispatch-local %s\n", version)
+               return nil
+       }
+
+       log.Printf("crunch-dispatch-local %s started", version)
+
        runningCmds = make(map[string]*exec.Cmd)
 
        arv, err := arvadosclient.MakeArvadosClient()
index 30cbb79dc186de45366d648e2a26e42266ee7de3..d322b0f3f6dcf53735df8e920f07d0329397bbbf 100644 (file)
@@ -26,6 +26,8 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
+var version = "dev"
+
 // Config used by crunch-dispatch-slurm
 type Config struct {
        Client arvados.Client
@@ -69,10 +71,21 @@ func doMain() error {
                "dump-config",
                false,
                "write current configuration to stdout and exit")
-
+       getVersion := flags.Bool(
+               "version",
+               false,
+               "Print version information and exit.")
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunch-dispatch-slurm %s\n", version)
+               return nil
+       }
+
+       log.Printf("crunch-dispatch-slurm %s started", version)
+
        err := readConfig(&theConfig, *configPath)
        if err != nil {
                return err
index 27a548aa61e94c5bb9e555a6737cd5bcdf6b1c9f..f3f754b59d227c3d410ad1255a9a38da1dd9400b 100644 (file)
@@ -19,6 +19,7 @@ import (
        "os/signal"
        "path"
        "path/filepath"
+       "regexp"
        "runtime"
        "runtime/pprof"
        "sort"
@@ -39,6 +40,8 @@ import (
        dockerclient "github.com/docker/docker/client"
 )
 
+var version = "dev"
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -228,6 +231,35 @@ func (runner *ContainerRunner) stopSignals() {
        }
 }
 
+var errorBlacklist = []string{
+       "(?ms).*[Cc]annot connect to the Docker daemon.*",
+       "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
+}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+       for _, d := range errorBlacklist {
+               if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+                       if *brokenNodeHook == "" {
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+                       } else {
+                               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+                               // run killme script
+                               c := exec.Command(*brokenNodeHook)
+                               c.Stdout = runner.CrunchLog
+                               c.Stderr = runner.CrunchLog
+                               err := c.Run()
+                               if err != nil {
+                                       runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+                               }
+                       }
+                       return true
+               }
+       }
+       return false
+}
+
 // LoadImage determines the docker image id from the container record and
 // checks if it is available in the local Docker image store.  If not, it loads
 // the image from Keep.
@@ -616,7 +648,7 @@ type infoCommand struct {
        cmd   []string
 }
 
-// Gather node information and store it on the log for debugging
+// LogNodeInfo gathers node information and store it on the log for debugging
 // purposes.
 func (runner *ContainerRunner) LogNodeInfo() (err error) {
        w := runner.NewLogWriter("node-info")
@@ -666,7 +698,7 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) {
        return nil
 }
 
-// Get and save the raw JSON container record from the API server
+// LogContainerRecord gets and saves the raw JSON container record from the API server
 func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
                ArvClient:     runner.ArvClient,
@@ -889,7 +921,7 @@ func (runner *ContainerRunner) StartContainer() error {
                dockertypes.ContainerStartOptions{})
        if err != nil {
                var advice string
-               if strings.Contains(err.Error(), "no such file or directory") {
+               if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
                        advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
                }
                return fmt.Errorf("could not start container: %v%s", err, advice)
@@ -1418,6 +1450,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
 
 // Run the full container lifecycle.
 func (runner *ContainerRunner) Run() (err error) {
+       runner.CrunchLog.Printf("crunch-run %s started", version)
        runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
 
        hostname, hosterr := os.Hostname()
@@ -1487,7 +1520,11 @@ func (runner *ContainerRunner) Run() (err error) {
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
@@ -1516,8 +1553,6 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       runner.StartCrunchstat()
-
        if runner.IsCancelled() {
                return
        }
@@ -1528,8 +1563,11 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
+       runner.StartCrunchstat()
+
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
 
@@ -1593,8 +1631,17 @@ func main() {
                `Set networking mode for container.  Corresponds to Docker network mode (--net).
        `)
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunch-run %s\n", version)
+               return
+       }
+
+       log.Printf("crunch-run %s started", version)
+
        containerId := flag.Arg(0)
 
        if *caCertsPath != "" {
@@ -1607,25 +1654,27 @@ func main() {
        }
        api.Retries = 8
 
-       var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(api)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       kc, kcerr := keepclient.MakeKeepClient(api)
+       if kcerr != nil {
+               log.Fatalf("%s: %v", containerId, kcerr)
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
 
-       var docker *dockerclient.Client
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
-       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
-       }
-
+       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        dockerClientProxy := ThinDockerClientProxy{Docker: docker}
 
        cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+       if dockererr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.checkBrokenNode(dockererr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
+       }
+
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
index bc0b3125c9e699414a3d08f32cb88868769a346d..e1d9fed730ea5ace0393b07e8b1fd8f1eaff65e4 100644 (file)
@@ -130,6 +130,19 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco
 }
 
 func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+       if t.finish == 3 {
+               return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
+       }
+       if t.finish == 4 {
+               return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
+       }
+       if t.finish == 5 {
+               return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
+       }
+       if t.finish == 6 {
+               return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
+       }
+
        if container == "abcde" {
                // t.fn gets executed in ContainerWait
                return nil
@@ -156,6 +169,10 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string,
 }
 
 func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+       if t.finish == 2 {
+               return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+       }
+
        if t.imageLoaded == image {
                return dockertypes.ImageInspect{}, nil, nil
        } else {
@@ -164,6 +181,9 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string
 }
 
 func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+       if t.finish == 2 {
+               return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+       }
        _, err := io.Copy(ioutil.Discard, input)
        if err != nil {
                return dockertypes.ImageLoadResponse{}, err
@@ -321,11 +341,27 @@ type FileWrapper struct {
        len int64
 }
 
+func (fw FileWrapper) Readdir(n int) ([]os.FileInfo, error) {
+       return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Seek(int64, int) (int64, error) {
+       return 0, errors.New("not implemented")
+}
+
 func (fw FileWrapper) Size() int64 {
        return fw.len
 }
 
-func (fw FileWrapper) Seek(int64, int) (int64, error) {
+func (fw FileWrapper) Stat() (os.FileInfo, error) {
+       return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Truncate(int64) error {
+       return errors.New("not implemented")
+}
+
+func (fw FileWrapper) Write([]byte) (int, error) {
        return 0, errors.New("not implemented")
 }
 
@@ -433,20 +469,14 @@ func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, erro
 func (KeepReadErrorTestClient) ClearBlockCache() {
 }
 
-type ErrorReader struct{}
+type ErrorReader struct {
+       FileWrapper
+}
 
 func (ErrorReader) Read(p []byte) (n int, err error) {
        return 0, errors.New("ErrorReader")
 }
 
-func (ErrorReader) Close() error {
-       return nil
-}
-
-func (ErrorReader) Size() int64 {
-       return 0
-}
-
 func (ErrorReader) Seek(int64, int) (int64, error) {
        return 0, errors.New("ErrorReader")
 }
@@ -668,9 +698,10 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f
        if api.CalledWith("container.state", "Complete") != nil {
                c.Check(err, IsNil)
        }
-       c.Check(api.WasSetRunning, Equals, true)
-
-       c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+       if exitCode != 2 {
+               c.Check(api.WasSetRunning, Equals, true)
+               c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+       }
 
        if err != nil {
                for k, v := range api.Logs {
@@ -1759,3 +1790,149 @@ func (s *TestSuite) TestEvalSymlinkDir(c *C) {
        _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
        c.Assert(err, NotNil)
 }
+
+func (s *TestSuite) TestFullBrokenDocker1(c *C) {
+       tf, err := ioutil.TempFile("", "brokenNodeHook-")
+       c.Assert(err, IsNil)
+       defer os.Remove(tf.Name())
+
+       tf.Write([]byte(`#!/bin/sh
+exec echo killme
+`))
+       tf.Close()
+       os.Chmod(tf.Name(), 0700)
+
+       ech := tf.Name()
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+
+}
+
+func (s *TestSuite) TestFullBrokenDocker2(c *C) {
+       ech := ""
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+}
+
+func (s *TestSuite) TestFullBrokenDocker3(c *C) {
+       ech := ""
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 3, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+}
+
+func (s *TestSuite) TestBadCommand1(c *C) {
+       ech := ""
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 4, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}
+
+func (s *TestSuite) TestBadCommand2(c *C) {
+       ech := ""
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 5, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}
+
+func (s *TestSuite) TestBadCommand3(c *C) {
+       ech := ""
+       brokenNodeHook = &ech
+
+       api, _, _ := FullRunHelper(c, `{
+    "command": ["echo", "hello world"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {}
+}`, nil, 6, func(t *TestDockerClient) {
+               t.logWriter.Write(dockerLog(1, "hello world\n"))
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}
index cd84770e54e637bd9b195362d7dba7f14ff49601..ad433bb3b532fea36610e975a14b6ca750f5b353 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "bufio"
        "flag"
+       "fmt"
        "io"
        "log"
        "os"
@@ -23,6 +24,7 @@ const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
 var (
        signalOnDeadPPID  int = 15
        ppidCheckInterval     = time.Second
+       version               = "dev"
 )
 
 func main() {
@@ -36,9 +38,18 @@ func main() {
        flag.IntVar(&signalOnDeadPPID, "signal-on-dead-ppid", signalOnDeadPPID, "Signal to send child if crunchstat's parent process disappears (0 to disable)")
        flag.DurationVar(&ppidCheckInterval, "ppid-check-interval", ppidCheckInterval, "Time between checks for parent process disappearance")
        pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
 
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("crunchstat %s\n", version)
+               return
+       }
+
+       reporter.Logger.Printf("crunchstat %s started", version)
+
        if reporter.CgroupRoot == "" {
                reporter.Logger.Fatal("error: must provide -cgroup-root")
        } else if signalOnDeadPPID < 0 {
index 4dad90c86758edb118d7ab4b04958417533b9653..f174d1bb02c7ef1264a8d52a2079c11bb06778a7 100644 (file)
@@ -202,6 +202,7 @@ class Mount(object):
             logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
             self.logger.debug("arv-mount debugging enabled")
 
+        self.logger.info("%s %s started", sys.argv[0], __version__)
         self.logger.info("enable write is %s", self.args.enable_write)
 
     def _setup_api(self):
index b6358deefcf8d333971435b2a9ede4021bc68f54..496fb884d433a5eea350d27818d5e72629e9242f 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "flag"
+       "fmt"
        "net/http"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
@@ -10,13 +11,24 @@ import (
        log "github.com/Sirupsen/logrus"
 )
 
+var version = "dev"
+
 func main() {
        configFile := flag.String("config", arvados.DefaultConfigFile, "`path` to arvados configuration file")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("arvados-health %s\n", version)
+               return
+       }
+
        log.SetFormatter(&log.JSONFormatter{
                TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
        })
+       log.Printf("arvados-health %s started", version)
+
        cfg, err := arvados.GetConfig(*configFile)
        if err != nil {
                log.Fatal(err)
index 8a938ccf5308393311a121835c1eaef631f194ca..947033564df01e479d05682617fc041417e5d54f 100644 (file)
@@ -7,6 +7,7 @@ package main
 import (
        "encoding/json"
        "flag"
+       "fmt"
        "log"
        "os"
        "os/signal"
@@ -17,6 +18,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/config"
 )
 
+var version = "dev"
+
 const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
 
 // Config specifies site configuration, like API credentials and the
@@ -85,9 +88,16 @@ func main() {
        dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
        dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
        debugFlag := flag.Bool("debug", false, "enable debug messages")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Usage = usage
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keep-balance %s\n", version)
+               return
+       }
+
        mustReadConfig(&cfg, *configPath)
        if *serviceListPath != "" {
                mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
@@ -97,6 +107,8 @@ func main() {
                log.Fatal(config.DumpAndExit(cfg))
        }
 
+       log.Printf("keep-balance %s started", version)
+
        if *debugFlag {
                debugf = log.Printf
                if j, err := json.Marshal(cfg); err != nil {
index ce1168acd2c1d07bcd6e8623c5421bcbe905f04c..9ee99903c8d1e537d487a67d1c77d848fc93c807 100644 (file)
@@ -86,6 +86,29 @@ func (c *cache) Stats() cacheStats {
        }
 }
 
+// Update saves a modified version (fs) to an existing collection
+// (coll) and, if successful, updates the relevant cache entries so
+// subsequent calls to Get() reflect the modifications.
+func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
+       c.setupOnce.Do(c.setup)
+
+       if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
+               return err
+       } else {
+               coll.ManifestText = m
+       }
+       var updated arvados.Collection
+       defer c.pdhs.Remove(coll.UUID)
+       err := client.RequestAndDecode(&updated, "PATCH", "/arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
+       if err == nil {
+               c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
+                       expire:     time.Now().Add(time.Duration(c.TTL)),
+                       collection: &updated,
+               })
+       }
+       return err
+}
+
 func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
        c.setupOnce.Do(c.setup)
 
index 87a712f04c4f2d381dfcd3dccb5cbdc93680eaf1..d4a89c844b567a4d2fb8d3a94f8138e365a672d7 100644 (file)
@@ -7,42 +7,202 @@ package main
 import (
        "bytes"
        "io"
+       "io/ioutil"
+       "net/url"
+       "os"
        "os/exec"
+       "strings"
+       "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        check "gopkg.in/check.v1"
 )
 
 func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) {
-       basePath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
+       testdata := []byte("the human tragedy consists in the necessity of living with the consequences of actions performed under the pressure of compulsions we do not understand")
+
+       localfile, err := ioutil.TempFile("", "localfile")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(localfile.Name())
+       localfile.Write(testdata)
+
+       emptyfile, err := ioutil.TempFile("", "emptyfile")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(emptyfile.Name())
+
+       checkfile, err := ioutil.TempFile("", "checkfile")
+       c.Assert(err, check.IsNil)
+       defer os.Remove(checkfile.Name())
+
+       var newCollection arvados.Collection
+       arv := arvados.NewClientFromEnv()
+       arv.AuthToken = arvadostest.ActiveToken
+       err = arv.RequestAndDecode(&newCollection, "POST", "/arvados/v1/collections", bytes.NewBufferString(url.Values{"collection": {"{}"}}.Encode()), nil)
+       c.Assert(err, check.IsNil)
+       writePath := "/c=" + newCollection.UUID + "/t=" + arv.AuthToken + "/"
+
+       pdhPath := "/c=" + strings.Replace(arvadostest.FooAndBarFilesInDirPDH, "+", "-", -1) + "/t=" + arv.AuthToken + "/"
+
+       matchToday := time.Now().Format("Jan +2")
+
+       readPath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
        type testcase struct {
                path  string
                cmd   string
                match string
+               data  []byte
        }
        for _, trial := range []testcase{
                {
-                       path:  basePath,
+                       path:  readPath,
                        cmd:   "ls\n",
                        match: `(?ms).*dir1 *0 .*`,
                },
                {
-                       path:  basePath,
+                       path:  readPath,
                        cmd:   "ls dir1\n",
                        match: `(?ms).*bar *3.*foo *3 .*`,
                },
                {
-                       path:  basePath + "_/dir1",
+                       path:  readPath + "_/dir1",
                        cmd:   "ls\n",
                        match: `(?ms).*bar *3.*foo *3 .*`,
                },
                {
-                       path:  basePath + "dir1/",
+                       path:  readPath + "dir1/",
                        cmd:   "ls\n",
-                       match: `(?ms).*bar *3.*foo *3 .*`,
+                       match: `(?ms).*bar *3.*foo +3 +Feb +\d+ +2014.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get emptyfile '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*Not Found.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + emptyfile.Name() + "' emptyfile\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get emptyfile '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*Downloading .* succeeded.*`,
+                       data:  []byte{},
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + localfile.Name() + "' testfile\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get testfile '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*succeeded.*`,
+                       data:  testdata,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move testfile newdir0/\n",
+                       match: `(?ms).*Moving .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move testfile newdir0/\n",
+                       match: `(?ms).*Moving .* failed.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "ls\n",
+                       match: `(?ms).*newdir0.* 0 +` + matchToday + ` \d+:\d+\n.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move newdir0/testfile emptyfile/bogus/\n",
+                       match: `(?ms).*Moving .* failed.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "mkcol newdir1\n",
+                       match: `(?ms).*Creating .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "move newdir0/testfile newdir1/\n",
+                       match: `(?ms).*Moving .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + localfile.Name() + "' newdir1/testfile1\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "mkcol newdir2\n",
+                       match: `(?ms).*Creating .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "put '" + localfile.Name() + "' newdir2/testfile2\n",
+                       match: `(?ms).*Uploading .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "copy newdir2/testfile2 testfile3\n",
+                       match: `(?ms).*succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get testfile3 '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*succeeded.*`,
+                       data:  testdata,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*succeeded.*`,
+                       data:  testdata,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "rmcol newdir2\n",
+                       match: `(?ms).*Deleting collection .* succeeded.*`,
+               },
+               {
+                       path:  writePath,
+                       cmd:   "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+                       match: `(?ms).*Downloading .* failed.*`,
+               },
+               {
+                       path:  "/c=" + arvadostest.UserAgreementCollection + "/t=" + arv.AuthToken + "/",
+                       cmd:   "put '" + localfile.Name() + "' foo\n",
+                       match: `(?ms).*Uploading .* failed:.*403 Forbidden.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "put '" + localfile.Name() + "' foo\n",
+                       match: `(?ms).*Uploading .* failed:.*405 Method Not Allowed.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "move foo bar\n",
+                       match: `(?ms).*Moving .* failed:.*405 Method Not Allowed.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "copy foo bar\n",
+                       match: `(?ms).*Copying .* failed:.*405 Method Not Allowed.*`,
+               },
+               {
+                       path:  pdhPath,
+                       cmd:   "delete foo\n",
+                       match: `(?ms).*Deleting .* failed:.*405 Method Not Allowed.*`,
                },
        } {
-               c.Logf("%s %#v", "http://"+s.testServer.Addr, trial)
+               c.Logf("%s %+v", "http://"+s.testServer.Addr, trial)
+
+               os.Remove(checkfile.Name())
+
                cmd := exec.Command("cadaver", "http://"+s.testServer.Addr+trial.path)
                cmd.Stdin = bytes.NewBufferString(trial.cmd)
                stdout, err := cmd.StdoutPipe()
@@ -56,5 +216,15 @@ func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) {
                err = cmd.Wait()
                c.Check(err, check.Equals, nil)
                c.Check(buf.String(), check.Matches, trial.match)
+
+               if trial.data == nil {
+                       continue
+               }
+               checkfile, err = os.Open(checkfile.Name())
+               c.Assert(err, check.IsNil)
+               checkfile.Seek(0, os.SEEK_SET)
+               got, err := ioutil.ReadAll(checkfile)
+               c.Check(got, check.DeepEquals, trial.data)
+               c.Check(err, check.IsNil)
        }
 }
index 598fabcd37eddb2853119999145a760227e17060..b7da3b0e5ad2df7642319f16a97015bb3e45de63 100644 (file)
@@ -2,11 +2,11 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-// Keep-web provides read-only HTTP access to files stored in Keep. It
-// serves public data to anonymous and unauthenticated clients, and
-// serves private data to clients that supply Arvados API tokens. It
-// can be installed anywhere with access to Keep services, typically
-// behind a web proxy that supports TLS.
+// Keep-web provides read/write HTTP (WebDAV) access to files stored
+// in Keep. It serves public data to anonymous and unauthenticated
+// clients, and serves private data to clients that supply Arvados API
+// tokens. It can be installed anywhere with access to Keep services,
+// typically behind a web proxy that supports TLS.
 //
 // See http://doc.arvados.org/install/install-keep-web.html.
 //
@@ -40,7 +40,7 @@
 //
 // Proxy configuration
 //
-// Keep-web does not support SSL natively. Typically, it is installed
+// Keep-web does not support TLS natively. Typically, it is installed
 // behind a proxy like nginx.
 //
 // Here is an example nginx configuration.
index fd36218bc1c7a3c96e9d8917e8ecc60a1641ba55..a1476d3a8eb1b62fad8ea519702ec290e7f3472c 100644 (file)
@@ -10,6 +10,7 @@ import (
        "html"
        "html/template"
        "io"
+       "log"
        "net/http"
        "net/url"
        "os"
@@ -90,16 +91,72 @@ func (h *handler) setup() {
 func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
        status := struct {
                cacheStats
+               Version string
        }{
                cacheStats: h.Config.Cache.Stats(),
+               Version:    version,
        }
        json.NewEncoder(w).Encode(status)
 }
 
+// updateOnSuccess wraps httpserver.ResponseWriter. If the handler
+// sends an HTTP header indicating success, updateOnSuccess first
+// calls the provided update func. If the update func fails, a 500
+// response is sent, and the status code and body sent by the handler
+// are ignored (all response writes return the update error).
+type updateOnSuccess struct {
+       httpserver.ResponseWriter
+       update     func() error
+       sentHeader bool
+       err        error
+}
+
+func (uos *updateOnSuccess) Write(p []byte) (int, error) {
+       if uos.err != nil {
+               return 0, uos.err
+       }
+       if !uos.sentHeader {
+               uos.WriteHeader(http.StatusOK)
+       }
+       return uos.ResponseWriter.Write(p)
+}
+
+func (uos *updateOnSuccess) WriteHeader(code int) {
+       if !uos.sentHeader {
+               uos.sentHeader = true
+               if code >= 200 && code < 400 {
+                       if uos.err = uos.update(); uos.err != nil {
+                               code := http.StatusInternalServerError
+                               if err, ok := uos.err.(*arvados.TransactionError); ok {
+                                       code = err.StatusCode
+                               }
+                               log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err)
+                               http.Error(uos.ResponseWriter, uos.err.Error(), code)
+                               return
+                       }
+               }
+       }
+       uos.ResponseWriter.WriteHeader(code)
+}
+
 var (
+       writeMethod = map[string]bool{
+               "COPY":   true,
+               "DELETE": true,
+               "MKCOL":  true,
+               "MOVE":   true,
+               "PUT":    true,
+               "RMCOL":  true,
+       }
        webdavMethod = map[string]bool{
+               "COPY":     true,
+               "DELETE":   true,
+               "MKCOL":    true,
+               "MOVE":     true,
                "OPTIONS":  true,
                "PROPFIND": true,
+               "PUT":      true,
+               "RMCOL":    true,
        }
        browserMethod = map[string]bool{
                "GET":  true,
@@ -147,7 +204,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
                        return
                }
                w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Range")
-               w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PROPFIND")
+               w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
                w.Header().Set("Access-Control-Allow-Origin", "*")
                w.Header().Set("Access-Control-Max-Age", "86400")
                statusCode = http.StatusOK
@@ -352,21 +409,45 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
        }
        applyContentDispositionHdr(w, r, basename, attachment)
 
-       fs := collection.FileSystem(&arvados.Client{
+       client := &arvados.Client{
                APIHost:   arv.ApiServer,
                AuthToken: arv.ApiToken,
                Insecure:  arv.ApiInsecure,
-       }, kc)
+       }
+       fs, err := collection.FileSystem(client, kc)
+       if err != nil {
+               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               return
+       }
+
+       targetIsPDH := arvadosclient.PDHMatch(targetID)
+       if targetIsPDH && writeMethod[r.Method] {
+               statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error()
+               return
+       }
+
        if webdavMethod[r.Method] {
+               if writeMethod[r.Method] {
+                       // Save the collection only if/when all
+                       // webdav->filesystem operations succeed --
+                       // and send a 500 error if the modified
+                       // collection can't be saved.
+                       w = &updateOnSuccess{
+                               ResponseWriter: w,
+                               update: func() error {
+                                       return h.Config.Cache.Update(client, *collection, fs)
+                               }}
+               }
                h := webdav.Handler{
-                       Prefix:     "/" + strings.Join(pathParts[:stripParts], "/"),
-                       FileSystem: &webdavFS{collfs: fs},
+                       Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
+                       FileSystem: &webdavFS{
+                               collfs:  fs,
+                               writing: writeMethod[r.Method],
+                       },
                        LockSystem: h.webdavLS,
                        Logger: func(_ *http.Request, err error) {
-                               if os.IsNotExist(err) {
-                                       statusCode, statusText = http.StatusNotFound, err.Error()
-                               } else if err != nil {
-                                       statusCode, statusText = http.StatusInternalServerError, err.Error()
+                               if err != nil {
+                                       log.Printf("error from webdav handler: %q", err)
                                }
                        },
                }
index 6bd34d71130aaeefd4984e835f8285c01c6899ad..21e47c8dc7c3e0a64f8d320e24b5cd9041fe7117 100644 (file)
@@ -45,12 +45,12 @@ func (s *UnitSuite) TestCORSPreflight(c *check.C) {
        c.Check(resp.Code, check.Equals, http.StatusOK)
        c.Check(resp.Body.String(), check.Equals, "")
        c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, "*")
-       c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "GET, POST, OPTIONS, PROPFIND")
+       c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
        c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Equals, "Authorization, Content-Type, Range")
 
        // Check preflight for a disallowed request
        resp = httptest.NewRecorder()
-       req.Header.Set("Access-Control-Request-Method", "DELETE")
+       req.Header.Set("Access-Control-Request-Method", "MAKE-COFFEE")
        h.ServeHTTP(resp, req)
        c.Check(resp.Body.String(), check.Equals, "")
        c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed)
index 27ceb48c78bfebbc40aa9d3732cbcbba6ad63155..724af27c7e0e746b44218f5269d23b71228e6655 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "flag"
+       "fmt"
        "log"
        "os"
        "time"
@@ -17,6 +18,7 @@ import (
 
 var (
        defaultConfigPath = "/etc/arvados/keep-web/keep-web.yml"
+       version           = "dev"
 )
 
 // Config specifies server configuration.
@@ -85,9 +87,17 @@ func main() {
 
        dumpConfig := flag.Bool("dump-config", false,
                "write current configuration to stdout and exit")
+       getVersion := flag.Bool("version", false,
+               "print version information and exit.")
        flag.Usage = usage
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keep-web %s\n", version)
+               return
+       }
+
        if err := config.LoadFile(cfg, configPath); err != nil {
                if h := os.Getenv("ARVADOS_API_HOST"); h != "" && configPath == defaultConfigPath {
                        log.Printf("DEPRECATED: Using ARVADOS_API_HOST environment variable. Use config file instead.")
@@ -105,6 +115,8 @@ func main() {
                log.Fatal(config.DumpAndExit(cfg))
        }
 
+       log.Printf("keep-web %s started", version)
+
        os.Setenv("ARVADOS_API_HOST", cfg.Client.APIHost)
        srv := &server{Config: cfg}
        if err := srv.Start(); err != nil {
index 5f2d44cbe4d7ce4c12f86417763d02c28f0c5782..0a2b9eb988dce96c4791f7cbbaf0c602d5b16980 100644 (file)
@@ -31,6 +31,7 @@ func (s *UnitSuite) TestStatus(c *check.C) {
        err := json.NewDecoder(resp.Body).Decode(&status)
        c.Check(err, check.IsNil)
        c.Check(status["Cache.Requests"], check.Equals, float64(0))
+       c.Check(status["Version"], check.Not(check.Equals), "")
 }
 
 func (s *IntegrationSuite) TestNoStatusFromVHost(c *check.C) {
index 57f3f53a99ef6a73944c6cb600c1672c67cb6696..3ceb0ed5c9ea6c71f06e5ad857ad1223bb277433 100644 (file)
@@ -9,9 +9,9 @@ import (
        "errors"
        "fmt"
        prand "math/rand"
-       "net/http"
        "os"
-       "sync"
+       "path"
+       "strings"
        "sync/atomic"
        "time"
 
@@ -27,103 +27,83 @@ var (
        errReadOnly           = errors.New("read-only filesystem")
 )
 
-// webdavFS implements a read-only webdav.FileSystem by wrapping an
+// webdavFS implements a webdav.FileSystem by wrapping an
 // arvados.CollectionFilesystem.
+//
+// Collections don't preserve empty directories, so Mkdir is
+// effectively a no-op, and we need to make parent dirs spring into
+// existence automatically so sequences like "mkcol foo; put foo/bar"
+// work as expected.
 type webdavFS struct {
-       collfs arvados.CollectionFileSystem
+       collfs  arvados.CollectionFileSystem
+       writing bool
 }
 
-var _ webdav.FileSystem = &webdavFS{}
+func (fs *webdavFS) makeparents(name string) {
+       dir, name := path.Split(name)
+       if dir == "" || dir == "/" {
+               return
+       }
+       dir = dir[:len(dir)-1]
+       fs.makeparents(dir)
+       fs.collfs.Mkdir(dir, 0755)
+}
 
 func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
-       return errReadOnly
+       if !fs.writing {
+               return errReadOnly
+       }
+       name = strings.TrimRight(name, "/")
+       fs.makeparents(name)
+       return fs.collfs.Mkdir(name, 0755)
 }
 
-func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) {
-       fi, err := fs.collfs.Stat(name)
-       if err != nil {
-               return nil, err
+func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
+       writing := flag&(os.O_WRONLY|os.O_RDWR) != 0
+       if writing {
+               fs.makeparents(name)
+       }
+       f, err = fs.collfs.OpenFile(name, flag, perm)
+       if !fs.writing {
+               // webdav module returns 404 on all OpenFile errors,
+               // but returns 405 Method Not Allowed if OpenFile()
+               // succeeds but Write() or Close() fails. We'd rather
+               // have 405.
+               f = writeFailer{File: f, err: errReadOnly}
        }
-       return &webdavFile{collfs: fs.collfs, fileInfo: fi, name: name}, nil
+       return
 }
 
 func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
-       return errReadOnly
+       return fs.collfs.RemoveAll(name)
 }
 
 func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
-       return errReadOnly
-}
-
-func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
-       return fs.collfs.Stat(name)
-}
-
-// webdavFile implements a read-only webdav.File by wrapping
-// http.File.
-//
-// The http.File is opened from an arvados.CollectionFileSystem, but
-// not until Seek, Read, or Readdir is called. This deferred-open
-// strategy makes webdav's OpenFile-Stat-Close cycle fast even though
-// the collfs's Open method is slow. This is relevant because webdav
-// does OpenFile-Stat-Close on each file when preparing directory
-// listings.
-//
-// Writes to a webdavFile always fail.
-type webdavFile struct {
-       // fields populated by (*webdavFS).OpenFile()
-       collfs   http.FileSystem
-       fileInfo os.FileInfo
-       name     string
-
-       // internal fields
-       file     http.File
-       loadOnce sync.Once
-       err      error
-}
-
-func (f *webdavFile) load() {
-       f.file, f.err = f.collfs.Open(f.name)
-}
-
-func (f *webdavFile) Write([]byte) (int, error) {
-       return 0, errReadOnly
-}
-
-func (f *webdavFile) Seek(offset int64, whence int) (int64, error) {
-       f.loadOnce.Do(f.load)
-       if f.err != nil {
-               return 0, f.err
+       if !fs.writing {
+               return errReadOnly
        }
-       return f.file.Seek(offset, whence)
+       fs.makeparents(newName)
+       return fs.collfs.Rename(oldName, newName)
 }
 
-func (f *webdavFile) Read(buf []byte) (int, error) {
-       f.loadOnce.Do(f.load)
-       if f.err != nil {
-               return 0, f.err
+func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
+       if fs.writing {
+               fs.makeparents(name)
        }
-       return f.file.Read(buf)
+       return fs.collfs.Stat(name)
 }
 
-func (f *webdavFile) Close() error {
-       if f.file == nil {
-               // We never called load(), or load() failed
-               return f.err
-       }
-       return f.file.Close()
+type writeFailer struct {
+       webdav.File
+       err error
 }
 
-func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) {
-       f.loadOnce.Do(f.load)
-       if f.err != nil {
-               return nil, f.err
-       }
-       return f.file.Readdir(n)
+func (wf writeFailer) Write([]byte) (int, error) {
+       return 0, wf.err
 }
 
-func (f *webdavFile) Stat() (os.FileInfo, error) {
-       return f.fileInfo, nil
+func (wf writeFailer) Close() error {
+       return wf.err
 }
 
 // noLockSystem implements webdav.LockSystem by returning success for
diff --git a/services/keep-web/webdav_test.go b/services/keep-web/webdav_test.go
new file mode 100644 (file)
index 0000000..52db776
--- /dev/null
@@ -0,0 +1,5 @@
+package main
+
+import "golang.org/x/net/webdav"
+
+var _ webdav.FileSystem = &webdavFS{}
index 3d1b4476255d0cadf2274ad12b9aae78f164f72f..145b39d4c3d1e643983c6f517eb31ff2c8d417fd 100644 (file)
@@ -10,7 +10,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net"
        "net/http"
        "os"
@@ -25,12 +24,16 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/health"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
 )
 
+var version = "dev"
+
 type Config struct {
        Client          arvados.Client
        Listen          string
@@ -55,7 +58,13 @@ var (
        router   http.Handler
 )
 
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
 func main() {
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: rfc3339NanoFixed,
+       })
+
        cfg := DefaultConfig()
 
        flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
@@ -74,8 +83,15 @@ func main() {
        const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
        flagset.StringVar(&cfgPath, "config", defaultCfgPath, "Configuration file `path`")
        dumpConfig := flagset.Bool("dump-config", false, "write current configuration to stdout and exit")
+       getVersion := flagset.Bool("version", false, "Print version information and exit.")
        flagset.Parse(os.Args[1:])
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keepproxy %s\n", version)
+               return
+       }
+
        err := config.LoadFile(cfg, cfgPath)
        if err != nil {
                h := os.Getenv("ARVADOS_API_HOST")
@@ -99,6 +115,8 @@ func main() {
                log.Fatal(config.DumpAndExit(cfg))
        }
 
+       log.Printf("keepproxy %s started", version)
+
        arv, err := arvadosclient.New(&cfg.Client)
        if err != nil {
                log.Fatalf("Error setting up arvados client %s", err.Error())
@@ -164,7 +182,7 @@ func main() {
 
        // Start serving requests.
        router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
-       http.Serve(listener, router)
+       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
 
        log.Println("shutting down")
 }
@@ -596,7 +614,8 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient
                        Timeout:   h.timeout,
                        Transport: h.transport,
                },
-               proto: req.Proto,
+               proto:     req.Proto,
+               requestID: req.Header.Get("X-Request-Id"),
        }
        return &kc
 }
index 23bb2bd216c1f1080d246e63a47252564e4d9bf4..a7b608b69c462fd4149f932c16dbabcaddbca6c6 100644 (file)
@@ -10,7 +10,6 @@ import (
        "errors"
        "fmt"
        "io/ioutil"
-       "log"
        "math/rand"
        "net/http"
        "net/http/httptest"
@@ -57,7 +56,7 @@ func waitForListener() {
                time.Sleep(ms * time.Millisecond)
        }
        if listener == nil {
-               log.Fatalf("Timed out waiting for listener to start")
+               panic("Timed out waiting for listener to start")
        }
 }
 
@@ -255,14 +254,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
        {
                _, _, err := kc.Ask(hash)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Ask (expected BlockNotFound)")
+               c.Log("Finished Ask (expected BlockNotFound)")
        }
 
        {
                reader, _, _, err := kc.Get(hash)
                c.Check(reader, Equals, nil)
                c.Check(err, Equals, keepclient.BlockNotFound)
-               log.Print("Finished Get (expected BlockNotFound)")
+               c.Log("Finished Get (expected BlockNotFound)")
        }
 
        // Note in bug #5309 among other errors keepproxy would set
@@ -281,14 +280,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB (expected success)")
+               c.Log("Finished PutB (expected success)")
        }
 
        {
                blocklen, _, err := kc.Ask(hash2)
                c.Assert(err, Equals, nil)
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Ask (expected success)")
+               c.Log("Finished Ask (expected success)")
        }
 
        {
@@ -297,7 +296,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte("foo"))
                c.Check(blocklen, Equals, int64(3))
-               log.Print("Finished Get (expected success)")
+               c.Log("Finished Get (expected success)")
        }
 
        {
@@ -307,7 +306,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("Finished PutB zero block")
+               c.Log("Finished PutB zero block")
        }
 
        {
@@ -316,7 +315,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
                all, err := ioutil.ReadAll(reader)
                c.Check(all, DeepEquals, []byte(""))
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Finished Get zero block")
+               c.Log("Finished Get zero block")
        }
 }
 
@@ -331,7 +330,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -339,7 +338,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(hash2, Equals, "")
                c.Check(rep, Equals, 0)
                c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -348,7 +347,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -357,7 +356,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
@@ -372,7 +371,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                errNotFound, _ := err.(keepclient.ErrNotFound)
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
-               log.Print("Ask 1")
+               c.Log("Ask 1")
        }
 
        {
@@ -380,7 +379,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
                c.Check(rep, Equals, 2)
                c.Check(err, Equals, nil)
-               log.Print("PutB")
+               c.Log("PutB")
        }
 
        {
@@ -389,7 +388,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Ask 2")
+               c.Log("Ask 2")
        }
 
        {
@@ -398,7 +397,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) {
                c.Check(errNotFound, NotNil)
                c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
                c.Check(blocklen, Equals, int64(0))
-               log.Print("Get")
+               c.Log("Get")
        }
 }
 
index 0faf4aea0e3c35354e30dc33f1e7005d491ab4d5..3fa2671df58331bb52c16651ded3924e9390ee90 100644 (file)
@@ -13,11 +13,13 @@ import (
 var viaAlias = "keepproxy"
 
 type proxyClient struct {
-       client keepclient.HTTPClient
-       proto  string
+       client    keepclient.HTTPClient
+       proto     string
+       requestID string
 }
 
 func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
        req.Header.Add("Via", pc.proto+" "+viaAlias)
+       req.Header.Add("X-Request-Id", pc.requestID)
        return pc.client.Do(req)
 }
index 424910dfa8b2af4a0d22ad9e91bd4eab20076af7..4d042a70dd376ea1e04bdff16283ab80669dfd0f 100644 (file)
@@ -975,7 +975,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) {
        ok := make(chan struct{})
        go func() {
                req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
-               (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+               MakeRESTRouter().ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
 
index 2d90aba14e4d36ddf41f035282b66100967d783b..daf4fc69ddff2702f0da43fd0af4d4927374bb2c 100644 (file)
@@ -311,6 +311,7 @@ type NodeStatus struct {
        TrashQueue      WorkQueueStatus
        RequestsCurrent int
        RequestsMax     int
+       Version         string
 }
 
 var st NodeStatus
@@ -346,6 +347,7 @@ func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
 
 // populate the given NodeStatus struct with current values.
 func (rtr *router) readNodeStatus(st *NodeStatus) {
+       st.Version = version
        vols := KeepVM.AllReadable()
        if cap(st.Volumes) < len(vols) {
                st.Volumes = make([]*volumeStatusEnt, len(vols))
index 921176dbbe93f481f497af476f176c2116ef3bce..b8a0ffb1cba46777ff1e2d1c745eb8102ea5fa61 100644 (file)
@@ -22,6 +22,8 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
+var version = "dev"
+
 // A Keep "block" is 64MB.
 const BlockSize = 64 * 1024 * 1024
 
@@ -89,6 +91,7 @@ func main() {
        deprecated.beforeFlagParse(theConfig)
 
        dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
 
        defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
        var configPath string
@@ -100,6 +103,12 @@ func main() {
        flag.Usage = usage
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keepstore %s\n", version)
+               return
+       }
+
        deprecated.afterFlagParse(theConfig)
 
        err := config.LoadFile(theConfig, configPath)
@@ -111,6 +120,8 @@ func main() {
                log.Fatal(config.DumpAndExit(theConfig))
        }
 
+       log.Printf("keepstore %s started", version)
+
        err = theConfig.Start()
        if err != nil {
                log.Fatal(err)
@@ -147,11 +158,11 @@ func main() {
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
 
-       // Middleware stack: logger, MaxRequests limiter, method handlers
+       // Middleware/handler stack
        router := MakeRESTRouter()
        limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
        router.limiter = limiter
-       http.Handle("/", &LoggingRESTRouter{router: limiter})
+       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
 
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)
diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go
deleted file mode 100644 (file)
index 63c28a2..0000000
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
-       "context"
-       "net/http"
-       "strings"
-       "time"
-
-       "git.curoverse.com/arvados.git/sdk/go/httpserver"
-       "git.curoverse.com/arvados.git/sdk/go/stats"
-       log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
-       Status int
-       Length int
-       http.ResponseWriter
-       ResponseBody string
-       sentHdr      time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
-       wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
-       if !ok {
-               // If upstream doesn't implement CloseNotifier, we can
-               // satisfy the interface by returning a channel that
-               // never sends anything (the interface doesn't
-               // guarantee that anything will ever be sent on the
-               // channel even if the client disconnects).
-               return nil
-       }
-       return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
-       if resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Status = code
-       resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
-       if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
-               resp.sentHdr = time.Now()
-       }
-       resp.Length += len(data)
-       if resp.Status >= 400 {
-               resp.ResponseBody += string(data)
-       }
-       return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
-       router      http.Handler
-       idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
-       tStart := time.Now()
-
-       // Attach a requestID-aware logger to the request context.
-       lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
-       ctx := context.WithValue(req.Context(), "logger", lgr)
-       req = req.WithContext(ctx)
-
-       lgr = lgr.WithFields(log.Fields{
-               "remoteAddr":      req.RemoteAddr,
-               "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
-               "reqMethod":       req.Method,
-               "reqPath":         req.URL.Path[1:],
-               "reqBytes":        req.ContentLength,
-       })
-       lgr.Debug("request")
-
-       resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
-       loggingRouter.router.ServeHTTP(&resp, req)
-       tDone := time.Now()
-
-       statusText := http.StatusText(resp.Status)
-       if resp.Status >= 400 {
-               statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
-       }
-       if resp.sentHdr == zeroTime {
-               // Nobody changed status or wrote any data, i.e., we
-               // returned a 200 response with no body.
-               resp.sentHdr = tDone
-       }
-
-       lgr.WithFields(log.Fields{
-               "timeTotal":      stats.Duration(tDone.Sub(tStart)),
-               "timeToStatus":   stats.Duration(resp.sentHdr.Sub(tStart)),
-               "timeWriteBody":  stats.Duration(tDone.Sub(resp.sentHdr)),
-               "respStatusCode": resp.Status,
-               "respStatus":     statusText,
-               "respBytes":      resp.Length,
-       }).Info("response")
-}
diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go
deleted file mode 100644 (file)
index 6ca48dc..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "net/http"
-       "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
-       http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
index 9e547f30d0c1c29cd8f510c5b72c2ae6ceba4acf..7a8297039b513d62b2f8a38ab81ab0f9bc89f187 100644 (file)
@@ -271,6 +271,7 @@ func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) {
 
        c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
        c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+       c.Check(getStatusItem("Version"), Not(Equals), "")
 
        response := IssueRequest(&testData.req)
        c.Assert(response.Code, Equals, testData.responseCode)
index e6a53d06c6c297ab343f697dd7c5f68a40e8355e..61e69f9096503eb88cf9328af5e85f129dba4226 100644 (file)
@@ -45,6 +45,8 @@ var (
        s3RaceWindow    time.Duration
 
        s3ACL = s3.Private
+
+       zeroTime time.Time
 )
 
 const (
index d85ef552c064ac0b9f4527c3b93194ede60c142f..888abf5a768d51cb34fe85b30ed9d1252b7dea4c 100644 (file)
@@ -125,7 +125,7 @@ def main(args=None):
 
     try:
         root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
-        root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
+        root_logger.info("%s %s started, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
         node_setup, node_shutdown, node_update, node_monitor = \
             config.dispatch_classes()
         server_calculator = build_server_calculator(config)
index cfd611285cffc91933fefb15a2dee37e14995c7e..069bf168950ad4c33296876d1044b9eebea2a7a4 100644 (file)
@@ -11,6 +11,8 @@ import logging
 import socketserver
 import threading
 
+from ._version import __version__
+
 _logger = logging.getLogger('status.Handler')
 
 
@@ -74,10 +76,11 @@ class Tracker(object):
     def __init__(self):
         self._mtx = threading.Lock()
         self._latest = {}
+        self._version = {'Version' : __version__}
 
     def get_json(self):
         with self._mtx:
-            return json.dumps(self._latest)
+            return json.dumps(dict(self._latest, **self._version))
 
     def keys(self):
         with self._mtx:
index a236e4f0eecd1d2ba30c3e01adc227e245aba03e..23658667a61843147854bd97304107e416c0d568 100644 (file)
@@ -56,6 +56,7 @@ class StatusServerUpdates(unittest.TestCase):
                 resp = r.json()
                 self.assertEqual(n, resp['nodes_'+str(n)])
             self.assertEqual(1, resp['nodes_1'])
+            self.assertIn('Version', resp)
 
 
 class StatusServerDisabled(unittest.TestCase):
index db33cbfd004173ff787f9767eddd06a448744d63..a0006a4f8a8e0e70e7488f6ce4dee4ac4359984c 100644 (file)
@@ -13,15 +13,23 @@ import (
 )
 
 var logger = ctxlog.FromContext
+var version = "dev"
 
 func main() {
        log := logger(nil)
 
        configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
        dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+       getVersion := flag.Bool("version", false, "Print version information and exit.")
        cfg := defaultConfig()
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("arvados-ws %s\n", version)
+               return
+       }
+
        err := config.LoadFile(&cfg, *configPath)
        if err != nil {
                log.Fatal(err)
@@ -39,7 +47,7 @@ func main() {
                return
        }
 
-       log.Info("started")
+       log.Printf("arvados-ws %s started", version)
        srv := &server{wsConfig: &cfg}
        log.Fatal(srv.Run())
 }
index 2b9bd66b1c466cfdee12d534cddc7eb11d4225e4..987c225eac8ba5a456aa0c2af2e7f56eede749be 100644 (file)
@@ -123,6 +123,7 @@ func (rtr *router) DebugStatus() interface{} {
 func (rtr *router) Status() interface{} {
        return map[string]interface{}{
                "Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
+               "Version": version,
        }
 }
 
index c1caa2ad37d31d40f962fbc1af8d43f05f18a5a9..b1f943857a18f495f2777c24aa3627855aa0d9f6 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "encoding/json"
        "io/ioutil"
        "net/http"
        "sync"
@@ -90,6 +91,21 @@ func (s *serverSuite) TestHealth(c *check.C) {
        }
 }
 
+func (s *serverSuite) TestStatus(c *check.C) {
+       go s.srv.Run()
+       defer s.srv.Close()
+       s.srv.WaitReady()
+       req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/status.json", nil)
+       c.Assert(err, check.IsNil)
+       resp, err := http.DefaultClient.Do(req)
+       c.Check(err, check.IsNil)
+       c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+       var status map[string]interface{}
+       err = json.NewDecoder(resp.Body).Decode(&status)
+       c.Check(err, check.IsNil)
+       c.Check(status["Version"], check.Not(check.Equals), "")
+}
+
 func (s *serverSuite) TestHealthDisabled(c *check.C) {
        s.cfg.ManagementToken = ""
 
index d7efdefb6f68a791c1417c05357b5f2636227e84..6b4781c3549627f0f9874cc0be734b611b41c5dd 100644 (file)
@@ -19,6 +19,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvados"
 )
 
+var version = "dev"
+
 type resourceList interface {
        Len() int
        GetItems() []interface{}
@@ -150,6 +152,10 @@ func ParseFlags(config *ConfigParams) error {
                "verbose",
                false,
                "Log informational messages. Off by default.")
+       getVersion := flags.Bool(
+               "version",
+               false,
+               "Print version information and exit.")
        parentGroupUUID := flags.String(
                "parent-group-uuid",
                "",
@@ -158,6 +164,12 @@ func ParseFlags(config *ConfigParams) error {
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("arv-sync-groups %s\n", version)
+               os.Exit(0)
+       }
+
        // Input file as a required positional argument
        if flags.NArg() == 0 {
                return fmt.Errorf("please provide a path to an input file")
@@ -276,7 +288,7 @@ func doMain(cfg *ConfigParams) error {
        }
        defer f.Close()
 
-       log.Printf("Group sync starting. Using %q as users id and parent group UUID %q", cfg.UserID, cfg.ParentGroupUUID)
+       log.Printf("arv-sync-groups %s started. Using %q as users id and parent group UUID %q", version, cfg.UserID, cfg.ParentGroupUUID)
 
        // Get the complete user list to minimize API Server requests
        allUsers := make(map[string]arvados.User)
index f87cccadbfac636f7fa4f9249363c842977f3446..d1c3037ca2b0adbe73f0019072fd7041659ebbe0 100644 (file)
@@ -4,4 +4,4 @@
 
 include agpl-3.0.txt
 include crunchstat_summary/dygraphs.js
-include crunchstat_summary/chartjs.js
+include crunchstat_summary/synchronizer.js
index 5c1d0994bf09c1376d5d6a4180503ffedc18334f..52e5534ef179f1124c90084e68596b8a43bf08e4 100644 (file)
@@ -72,6 +72,8 @@ window.onload = function() {
         });
     });
 
+    var sync = Dygraph.synchronize(Object.values(charts), {range: false});
+
     if (typeof window.debug === 'undefined')
         window.debug = {};
     window.debug.charts = charts;
index e72832eb5e005584969dd2982facb0b6b32c71cc..1314e9df35612817e260e6644212ef2e8a387bc3 100644 (file)
@@ -8,7 +8,7 @@ import crunchstat_summary.webchart
 class DygraphsChart(crunchstat_summary.webchart.WebChart):
     CSS = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.css'
     JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.js'
-    JSASSET = 'dygraphs.js'
+    JSASSETS = ['synchronizer.js','dygraphs.js']
 
     def headHTML(self):
         return '<link rel="stylesheet" href="{}">\n'.format(self.CSS)
diff --git a/tools/crunchstat-summary/crunchstat_summary/synchronizer.js b/tools/crunchstat-summary/crunchstat_summary/synchronizer.js
new file mode 100644 (file)
index 0000000..78c8d42
--- /dev/null
@@ -0,0 +1,273 @@
+/**
+ * Synchronize zooming and/or selections between a set of dygraphs.
+ *
+ * Usage:
+ *
+ *   var g1 = new Dygraph(...),
+ *       g2 = new Dygraph(...),
+ *       ...;
+ *   var sync = Dygraph.synchronize(g1, g2, ...);
+ *   // charts are now synchronized
+ *   sync.detach();
+ *   // charts are no longer synchronized
+ *
+ * You can set options using the last parameter, for example:
+ *
+ *   var sync = Dygraph.synchronize(g1, g2, g3, {
+ *      selection: true,
+ *      zoom: true
+ *   });
+ *
+ * The default is to synchronize both of these.
+ *
+ * Instead of passing one Dygraph object as each parameter, you may also pass an
+ * array of dygraphs:
+ *
+ *   var sync = Dygraph.synchronize([g1, g2, g3], {
+ *      selection: false,
+ *      zoom: true
+ *   });
+ *
+ * You may also set `range: false` if you wish to only sync the x-axis.
+ * The `range` option has no effect unless `zoom` is true (the default).
+ *
+ * SPDX-License-Identifier: MIT
+ * Original source: https://github.com/danvk/dygraphs/blob/master/src/extras/synchronizer.js
+ * at commit b55a71d768d2f8de62877c32b3aec9e9975ac389
+ *
+ * Copyright (c) 2009 Dan Vanderkam
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+(function() {
+/* global Dygraph:false */
+'use strict';
+
+var Dygraph;
+if (window.Dygraph) {
+  Dygraph = window.Dygraph;
+} else if (typeof(module) !== 'undefined') {
+  Dygraph = require('../dygraph');
+}
+
+var synchronize = function(/* dygraphs..., opts */) {
+  if (arguments.length === 0) {
+    throw 'Invalid invocation of Dygraph.synchronize(). Need >= 1 argument.';
+  }
+
+  var OPTIONS = ['selection', 'zoom', 'range'];
+  var opts = {
+    selection: true,
+    zoom: true,
+    range: true
+  };
+  var dygraphs = [];
+  var prevCallbacks = [];
+
+  var parseOpts = function(obj) {
+    if (!(obj instanceof Object)) {
+      throw 'Last argument must be either Dygraph or Object.';
+    } else {
+      for (var i = 0; i < OPTIONS.length; i++) {
+        var optName = OPTIONS[i];
+        if (obj.hasOwnProperty(optName)) opts[optName] = obj[optName];
+      }
+    }
+  };
+
+  if (arguments[0] instanceof Dygraph) {
+    // Arguments are Dygraph objects.
+    for (var i = 0; i < arguments.length; i++) {
+      if (arguments[i] instanceof Dygraph) {
+        dygraphs.push(arguments[i]);
+      } else {
+        break;
+      }
+    }
+    if (i < arguments.length - 1) {
+      throw 'Invalid invocation of Dygraph.synchronize(). ' +
+            'All but the last argument must be Dygraph objects.';
+    } else if (i == arguments.length - 1) {
+      parseOpts(arguments[arguments.length - 1]);
+    }
+  } else if (arguments[0].length) {
+    // Invoked w/ list of dygraphs, options
+    for (var i = 0; i < arguments[0].length; i++) {
+      dygraphs.push(arguments[0][i]);
+    }
+    if (arguments.length == 2) {
+      parseOpts(arguments[1]);
+    } else if (arguments.length > 2) {
+      throw 'Invalid invocation of Dygraph.synchronize(). ' +
+            'Expected two arguments: array and optional options argument.';
+    }  // otherwise arguments.length == 1, which is fine.
+  } else {
+    throw 'Invalid invocation of Dygraph.synchronize(). ' +
+          'First parameter must be either Dygraph or list of Dygraphs.';
+  }
+
+  if (dygraphs.length < 2) {
+    throw 'Invalid invocation of Dygraph.synchronize(). ' +
+          'Need two or more dygraphs to synchronize.';
+  }
+
+  var readycount = dygraphs.length;
+  for (var i = 0; i < dygraphs.length; i++) {
+    var g = dygraphs[i];
+    g.ready( function() {
+      if (--readycount == 0) {
+        // store original callbacks
+        var callBackTypes = ['drawCallback', 'highlightCallback', 'unhighlightCallback'];
+        for (var j = 0; j < dygraphs.length; j++) {
+          if (!prevCallbacks[j]) {
+            prevCallbacks[j] = {};
+          }
+          for (var k = callBackTypes.length - 1; k >= 0; k--) {
+            prevCallbacks[j][callBackTypes[k]] = dygraphs[j].getFunctionOption(callBackTypes[k]);
+          }
+        }
+
+        // Listen for draw, highlight, unhighlight callbacks.
+        if (opts.zoom) {
+          attachZoomHandlers(dygraphs, opts, prevCallbacks);
+        }
+
+        if (opts.selection) {
+          attachSelectionHandlers(dygraphs, prevCallbacks);
+        }
+      }
+    });
+  }
+
+  return {
+    detach: function() {
+      for (var i = 0; i < dygraphs.length; i++) {
+        var g = dygraphs[i];
+        if (opts.zoom) {
+          g.updateOptions({drawCallback: prevCallbacks[i].drawCallback});
+        }
+        if (opts.selection) {
+          g.updateOptions({
+            highlightCallback: prevCallbacks[i].highlightCallback,
+            unhighlightCallback: prevCallbacks[i].unhighlightCallback
+          });
+        }
+      }
+      // release references & make subsequent calls throw.
+      dygraphs = null;
+      opts = null;
+      prevCallbacks = null;
+    }
+  };
+};
+
+function arraysAreEqual(a, b) {
+  if (!Array.isArray(a) || !Array.isArray(b)) return false;
+  var i = a.length;
+  if (i !== b.length) return false;
+  while (i--) {
+    if (a[i] !== b[i]) return false;
+  }
+  return true;
+}
+
+function attachZoomHandlers(gs, syncOpts, prevCallbacks) {
+  var block = false;
+  for (var i = 0; i < gs.length; i++) {
+    var g = gs[i];
+    g.updateOptions({
+      drawCallback: function(me, initial) {
+        if (block || initial) return;
+        block = true;
+        var opts = {
+          dateWindow: me.xAxisRange()
+        };
+        if (syncOpts.range) opts.valueRange = me.yAxisRange();
+
+        for (var j = 0; j < gs.length; j++) {
+          if (gs[j] == me) {
+            if (prevCallbacks[j] && prevCallbacks[j].drawCallback) {
+              prevCallbacks[j].drawCallback.apply(this, arguments);
+            }
+            continue;
+          }
+
+          // Only redraw if there are new options
+          if (arraysAreEqual(opts.dateWindow, gs[j].getOption('dateWindow')) && 
+              arraysAreEqual(opts.valueRange, gs[j].getOption('valueRange'))) {
+            continue;
+          }
+
+          gs[j].updateOptions(opts);
+        }
+        block = false;
+      }
+    }, true /* no need to redraw */);
+  }
+}
+
+function attachSelectionHandlers(gs, prevCallbacks) {
+  var block = false;
+  for (var i = 0; i < gs.length; i++) {
+    var g = gs[i];
+
+    g.updateOptions({
+      highlightCallback: function(event, x, points, row, seriesName) {
+        if (block) return;
+        block = true;
+        var me = this;
+        for (var i = 0; i < gs.length; i++) {
+          if (me == gs[i]) {
+            if (prevCallbacks[i] && prevCallbacks[i].highlightCallback) {
+              prevCallbacks[i].highlightCallback.apply(this, arguments);
+            }
+            continue;
+          }
+          var idx = gs[i].getRowForX(x);
+          if (idx !== null) {
+            gs[i].setSelection(idx, seriesName);
+          }
+        }
+        block = false;
+      },
+      unhighlightCallback: function(event) {
+        if (block) return;
+        block = true;
+        var me = this;
+        for (var i = 0; i < gs.length; i++) {
+          if (me == gs[i]) {
+            if (prevCallbacks[i] && prevCallbacks[i].unhighlightCallback) {
+              prevCallbacks[i].unhighlightCallback.apply(this, arguments);
+            }
+            continue;
+          }
+          gs[i].clearSelection();
+        }
+        block = false;
+      }
+    }, true /* no need to redraw */);
+  }
+}
+
+Dygraph.synchronize = synchronize;
+
+})();
index 790b08da87f1b7894fc4e47511c63d44eda9604e..9d18883ce2506d71abe03e08abde2fee28006343 100644 (file)
@@ -10,7 +10,7 @@ import pkg_resources
 class WebChart(object):
     """Base class for a web chart.
 
-    Subclasses must assign JSLIB and JSASSET, and override the
+    Subclasses must assign JSLIB and JSASSETS, and override the
     chartdata() method.
     """
     JSLIB = None
@@ -33,7 +33,7 @@ class WebChart(object):
     def js(self):
         return 'var chartdata = {};\n{}'.format(
             json.dumps(self.sections()),
-            pkg_resources.resource_string('crunchstat_summary', self.JSASSET))
+            '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
 
     def sections(self):
         return [
index 7dca3293d25fce94111eb1708fde156fc5c52387..2de7a96c9a2a81f85f79c6945935f36ae2f201c0 100644 (file)
@@ -20,6 +20,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
+var version = "dev"
+
 func main() {
        err := doMain(os.Args[1:])
        if err != nil {
@@ -62,9 +64,20 @@ func doMain(args []string) error {
                false,
                "Log progress of each block verification")
 
+       getVersion := flags.Bool(
+               "version",
+               false,
+               "Print version information and exit.")
+
        // Parse args; omit the first arg which is the command name
        flags.Parse(args)
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keep-block-check %s\n", version)
+               os.Exit(0)
+       }
+
        config, blobSigningKey, err := loadConfig(*configFile)
        if err != nil {
                return fmt.Errorf("Error loading configuration from file: %s", err.Error())
index 6c8a866291b10ca0efcff8e7d9f50ca6d727703b..6bf1abbba3865e602e7e81092b4152cedf0118be 100644 (file)
@@ -22,16 +22,20 @@ import (
        "crypto/rand"
        "encoding/binary"
        "flag"
+       "fmt"
        "io"
        "io/ioutil"
        "log"
        "net/http"
+       "os"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
+var version = "dev"
+
 // Command line config knobs
 var (
        BlockSize     = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
@@ -43,11 +47,20 @@ var (
        StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
        ServiceURL    = flag.String("url", "", "specify scheme://host of a single keep service to exercise (instead of using all advertised services like normal clients)")
        ServiceUUID   = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
+       getVersion    = flag.Bool("version", false, "Print version information and exit.")
 )
 
 func main() {
        flag.Parse()
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keep-exercise %s\n", version)
+               os.Exit(0)
+       }
+
+       log.Printf("keep-exercise %s started", version)
+
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Fatal(err)
index a299d17febb1a46fc276ab6ed0fac5ee49ac09be..303f71f8fe77e2cb69d6b313e3a7e822fb96d5f4 100644 (file)
@@ -21,6 +21,8 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
 )
 
+var version = "dev"
+
 func main() {
        err := doMain()
        if err != nil {
@@ -69,9 +71,20 @@ func doMain() error {
                0,
                "Lifetime of blob permission signatures on source keepservers. If not provided, this will be retrieved from the API server's discovery document.")
 
+       getVersion := flags.Bool(
+               "version",
+               false,
+               "Print version information and exit.")
+
        // Parse args; omit the first arg which is the command name
        flags.Parse(os.Args[1:])
 
+       // Print version information if requested
+       if *getVersion {
+               fmt.Printf("keep-rsync %s\n", version)
+               os.Exit(0)
+       }
+
        srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile)
        if err != nil {
                return fmt.Errorf("Error loading src configuration from file: %s", err.Error())