From: Tom Clegg Date: Mon, 4 Dec 2017 21:03:38 +0000 (-0500) Subject: 11453: Merge branch 'master' into 11453-federated-tokens X-Git-Tag: 1.1.2~25^2~10 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/644f5de63e2b8b02e054fcbb3e9af39560cffae3?hp=7d51b030e06c4314ee596bfdd51f1d8ad4f5f992 11453: Merge branch 'master' into 11453-federated-tokens Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/apps/workbench/app/models/arvados_api_client.rb b/apps/workbench/app/models/arvados_api_client.rb index ac2fe3a976..5a8fd518d3 100644 --- a/apps/workbench/app/models/arvados_api_client.rb +++ b/apps/workbench/app/models/arvados_api_client.rb @@ -82,7 +82,7 @@ class ArvadosApiClient @client_mtx = Mutex.new end - def api(resources_kind, action, data=nil, tokens={}) + def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true) profile_checkpoint @@ -117,7 +117,7 @@ class ArvadosApiClient 'reader_tokens' => ((tokens[:reader_tokens] || Thread.current[:reader_tokens] || []) + - [Rails.configuration.anonymous_user_token]).to_json, + (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json, } if !data.nil? data.each do |k,v| diff --git a/apps/workbench/app/models/user.rb b/apps/workbench/app/models/user.rb index 10da22db69..1f102dbf17 100644 --- a/apps/workbench/app/models/user.rb +++ b/apps/workbench/app/models/user.rb @@ -10,7 +10,7 @@ class User < ArvadosBase end def self.current - res = arvados_api_client.api self, '/current' + res = arvados_api_client.api self, '/current', nil, {}, false arvados_api_client.unpack_api_response(res) end diff --git a/build/run-library.sh b/build/run-library.sh index 5fc494cdf5..029fefc9bb 100755 --- a/build/run-library.sh +++ b/build/run-library.sh @@ -132,7 +132,7 @@ package_go_binary() { return 1 fi - go get "git.curoverse.com/arvados.git/$src_path" + go get -ldflags "-X main.version=${version}" "git.curoverse.com/arvados.git/$src_path" declare -a switches=() systemd_unit="$WORKSPACE/${src_path}/${prog}.service" diff --git a/build/run-tests.sh b/build/run-tests.sh index c6110f28b7..6063779b30 100755 --- a/build/run-tests.sh +++ b/build/run-tests.sh @@ -487,6 +487,7 @@ export PERLLIB="$PERLINSTALLBASE/lib/perl5:${PERLLIB:+$PERLLIB}" export GOPATH mkdir -p "$GOPATH/src/git.curoverse.com" +rmdir --parents "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" ln -sfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \ || fatal "symlink failed" go get -v github.com/kardianos/govendor \ diff --git a/doc/_config.yml b/doc/_config.yml index 3068647074..e8a899c004 100644 --- a/doc/_config.yml +++ b/doc/_config.yml @@ -74,6 +74,7 @@ navbar: - user/topics/run-command.html.textile.liquid - user/reference/job-pipeline-ref.html.textile.liquid - user/examples/crunch-examples.html.textile.liquid + - user/topics/arv-sync-groups.html.textile.liquid - Query the metadata database: - user/topics/tutorial-trait-search.html.textile.liquid - Arvados License: diff --git a/doc/install/install-keep-web.html.textile.liquid b/doc/install/install-keep-web.html.textile.liquid index ea2ebd161b..4def77e063 100644 --- a/doc/install/install-keep-web.html.textile.liquid +++ b/doc/install/install-keep-web.html.textile.liquid @@ -9,7 +9,7 @@ Copyright (C) The Arvados Authors. All rights reserved. SPDX-License-Identifier: CC-BY-SA-3.0 {% endcomment %} -The Keep-web server provides read-only HTTP access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail. +The Keep-web server provides read/write HTTP (WebDAV) access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides TLS support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail. By convention, we use the following hostnames for the Keep-web service: @@ -48,7 +48,7 @@ Usage of keep-web: -allow-anonymous Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection. (default false) -attachment-only-host string - Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or SSL. + Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or TLS. -listen string Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80") -trust-all-content @@ -77,15 +77,15 @@ exec sudo -u nobody keep-web \ Omit the @-allow-anonymous@ argument if you do not want to serve public data. -Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's SSL certificate is not signed by a recognized CA. +Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's TLS certificate is not signed by a recognized CA. -h3. Set up a reverse proxy with SSL support +h3. Set up a reverse proxy with TLS support -The Keep-web service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption. +The Keep-web service will be accessible from anywhere on the internet, so we recommend using TLS for transport encryption. -This is best achieved by putting a reverse proxy with SSL support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script). +This is best achieved by putting a reverse proxy with TLS support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script). -Note: A wildcard SSL certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard SSL certificate and DNS configured appropriately, all data can be served as web content. +Note: A wildcard TLS certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard TLS certificate and DNS configured appropriately, all data can be served as web content. For example, using Nginx: @@ -125,8 +125,8 @@ h3. Configure DNS Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address. * @download.uuid_prefix.your.domain@ * @collections.uuid_prefix.your.domain@ -* @*--collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names. -* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names. +* @*--collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names. +* @*.collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for these names. If neither of the above wildcard options is feasible, you have two choices: # Serve web content at @collections.uuid_prefix.your.domain@, but only for unauthenticated requests (public data and collection sharing links). Authenticated requests will always result in file downloads, using the @download@ name. For example, the Workbench "preview" button and the "view entire log file" link will invoke file downloads instead of displaying content in the browser window. diff --git a/doc/user/cwl/cwl-run-options.html.textile.liquid b/doc/user/cwl/cwl-run-options.html.textile.liquid index 7598ab822e..7f69c61feb 100644 --- a/doc/user/cwl/cwl-run-options.html.textile.liquid +++ b/doc/user/cwl/cwl-run-options.html.textile.liquid @@ -38,7 +38,7 @@ table(table table-bordered table-condensed). |==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner job (default 1024)| |==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner job, default arvados/jobs| |==--name== NAME| Name to use for workflow execution instance.| -|==--on-error {stop,continue}|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.| +|==--on-error {stop,continue}==|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.| |==--enable-dev==| Enable loading and running development versions of CWL spec.| |==--intermediate-output-ttl== N|If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).| |==--trash-intermediate==| Immediately trash intermediate outputs on workflow success.| diff --git a/doc/user/topics/arv-sync-groups.html.textile.liquid b/doc/user/topics/arv-sync-groups.html.textile.liquid new file mode 100644 index 0000000000..e2a42c8954 --- /dev/null +++ b/doc/user/topics/arv-sync-groups.html.textile.liquid @@ -0,0 +1,53 @@ +--- +layout: default +navsection: userguide +title: "Using arv-sync-groups" +... +{% comment %} +Copyright (C) The Arvados Authors. All rights reserved. + +SPDX-License-Identifier: CC-BY-SA-3.0 +{% endcomment %} + +The @arv-sync-groups@ tool allows to synchronize remote groups into Arvados from an external source. + +h1. Using arv-sync-groups + +This tool reads a CSV (comma-separated values) file having information about external groups and their members. When running it for the first time, it'll create a special group named 'Externally synchronized groups' meant to be the parent of all the remote groups. + +Every line on the file should have 2 values: a group name and a local user identifier, meaning that the named user is a member of the group. The tool will create the group if it doesn't exist, and add the user to it. If group member is not present on the input file, the account will be removed from the group. + +Users can be identified by their email address or username: the tool will check if every user exist on the system, and report back when not found. Groups on the other hand, are identified by their name. + +This tool is designed to be run periodically reading a file created by a remote auth system (ie: LDAP) dump script, applying what's included on the file as the source of truth. + + +bq. NOTE: @arv-sync-groups@ needs to perform several administrative tasks on Arvados, so must be run using a superuser token + +h2. Options + +The following command line options are supported: + +table(table table-bordered table-condensed). +|_. Option |_. Description | +|==--help==| This list of options| +|==--parent-group-uuid==| UUID of group to own all the externally synchronized groups| +|==--user-id== | Identifier to use in looking up user. One of 'email' or 'username' (Default: 'email')| +|==--verbose==| Log informational messages (Default: False)| +|==--version==| Print version and exit| + +h2. Examples + +To sync groups using the username to identify every account, reading from some @external_groups.csv@ file, the command should be called as follows: + + +
~$ arv-sync-groups --user-id username /path/to/external_groups.csv 
+
+
+ +If you want to use a specific preexisting group as the parent of all the remote groups, you can do it this way: + + +
~$ arv-sync-groups --parent-group-uuid <preexisting group UUID> --user-id username /path/to/external_groups.csv 
+
+
diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 47a953ac2c..a38d95c2e6 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -5,6 +5,7 @@ package arvados import ( + "bytes" "crypto/tls" "encoding/json" "fmt" @@ -180,6 +181,10 @@ func anythingToValues(params interface{}) (url.Values, error) { // // path must not contain a query string. func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error { + if body, ok := body.(io.Closer); ok { + // Ensure body is closed even if we error out early + defer body.Close() + } urlString := c.apiURL(path) urlValues, err := anythingToValues(params) if err != nil { @@ -202,6 +207,24 @@ func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io. return c.DoAndDecode(dst, req) } +type resource interface { + resourceName() string +} + +// UpdateBody returns an io.Reader suitable for use as an http.Request +// Body for a create or update API call. +func (c *Client) UpdateBody(rsc resource) io.Reader { + j, err := json.Marshal(rsc) + if err != nil { + // Return a reader that returns errors. + r, w := io.Pipe() + w.CloseWithError(err) + return r + } + v := url.Values{rsc.resourceName(): {string(j)}} + return bytes.NewBufferString(v.Encode()) +} + func (c *Client) httpClient() *http.Client { switch { case c.Client != nil: diff --git a/sdk/go/arvados/collection.go b/sdk/go/arvados/collection.go index 61bcd7fe8f..999b4e9d48 100644 --- a/sdk/go/arvados/collection.go +++ b/sdk/go/arvados/collection.go @@ -30,6 +30,10 @@ type Collection struct { IsTrashed bool `json:"is_trashed,omitempty"` } +func (c Collection) resourceName() string { + return "collection" +} + // SizedDigests returns the hash+size part of each data block // referenced by the collection. func (c *Collection) SizedDigests() ([]SizedDigest, error) { diff --git a/sdk/go/arvados/collection_fs.go b/sdk/go/arvados/collection_fs.go index 1acf27442c..28629e33b2 100644 --- a/sdk/go/arvados/collection_fs.go +++ b/sdk/go/arvados/collection_fs.go @@ -5,268 +5,1378 @@ package arvados import ( + "errors" + "fmt" "io" "net/http" "os" "path" + "regexp" + "sort" + "strconv" "strings" "sync" "time" +) + +var ( + ErrReadOnlyFile = errors.New("read-only file") + ErrNegativeOffset = errors.New("cannot seek to negative offset") + ErrFileExists = errors.New("file exists") + ErrInvalidOperation = errors.New("invalid operation") + ErrInvalidArgument = errors.New("invalid argument") + ErrDirectoryNotEmpty = errors.New("directory not empty") + ErrWriteOnlyMode = errors.New("file is O_WRONLY") + ErrSyncNotSupported = errors.New("O_SYNC flag is not supported") + ErrIsDirectory = errors.New("cannot rename file to overwrite existing directory") + ErrPermission = os.ErrPermission - "git.curoverse.com/arvados.git/sdk/go/manifest" + maxBlockSize = 1 << 26 ) +// A File is an *os.File-like interface for reading and writing files +// in a CollectionFileSystem. type File interface { io.Reader + io.Writer io.Closer io.Seeker Size() int64 + Readdir(int) ([]os.FileInfo, error) + Stat() (os.FileInfo, error) + Truncate(int64) error } type keepClient interface { - ManifestFileReader(manifest.Manifest, string) (File, error) + ReadAt(locator string, p []byte, off int) (int, error) + PutB(p []byte) (string, int, error) } -type collectionFile struct { - File - collection *Collection - name string - size int64 +type fileinfo struct { + name string + mode os.FileMode + size int64 + modTime time.Time } -func (cf *collectionFile) Size() int64 { - return cf.size +// Name implements os.FileInfo. +func (fi fileinfo) Name() string { + return fi.name } -func (cf *collectionFile) Readdir(count int) ([]os.FileInfo, error) { - return nil, io.EOF +// ModTime implements os.FileInfo. +func (fi fileinfo) ModTime() time.Time { + return fi.modTime } -func (cf *collectionFile) Stat() (os.FileInfo, error) { - return collectionDirent{ - collection: cf.collection, - name: cf.name, - size: cf.size, - isDir: false, - }, nil +// Mode implements os.FileInfo. +func (fi fileinfo) Mode() os.FileMode { + return fi.mode } -type collectionDir struct { - collection *Collection - stream string - dirents []os.FileInfo +// IsDir implements os.FileInfo. +func (fi fileinfo) IsDir() bool { + return fi.mode&os.ModeDir != 0 } -// Readdir implements os.File. -func (cd *collectionDir) Readdir(count int) ([]os.FileInfo, error) { - ret := cd.dirents - if count <= 0 { - cd.dirents = nil - return ret, nil - } else if len(ret) == 0 { - return nil, io.EOF +// Size implements os.FileInfo. +func (fi fileinfo) Size() int64 { + return fi.size +} + +// Sys implements os.FileInfo. +func (fi fileinfo) Sys() interface{} { + return nil +} + +// A CollectionFileSystem is an http.Filesystem plus Stat() and +// support for opening writable files. All methods are safe to call +// from multiple goroutines. +type CollectionFileSystem interface { + http.FileSystem + + // analogous to os.Stat() + Stat(name string) (os.FileInfo, error) + + // analogous to os.Create(): create/truncate a file and open it O_RDWR. + Create(name string) (File, error) + + // Like os.OpenFile(): create or open a file or directory. + // + // If flag&os.O_EXCL==0, it opens an existing file or + // directory if one exists. If flag&os.O_CREATE!=0, it creates + // a new empty file or directory if one does not already + // exist. + // + // When creating a new item, perm&os.ModeDir determines + // whether it is a file or a directory. + // + // A file can be opened multiple times and used concurrently + // from multiple goroutines. However, each File object should + // be used by only one goroutine at a time. + OpenFile(name string, flag int, perm os.FileMode) (File, error) + + Mkdir(name string, perm os.FileMode) error + Remove(name string) error + RemoveAll(name string) error + Rename(oldname, newname string) error + + // Flush all file data to Keep and return a snapshot of the + // filesystem suitable for saving as (Collection)ManifestText. + // Prefix (normally ".") is a top level directory, effectively + // prepended to all paths in the returned manifest. + MarshalManifest(prefix string) (string, error) +} + +type fileSystem struct { + dirnode +} + +func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) { + return fs.dirnode.OpenFile(name, flag, perm) +} + +func (fs *fileSystem) Open(name string) (http.File, error) { + return fs.dirnode.OpenFile(name, os.O_RDONLY, 0) +} + +func (fs *fileSystem) Create(name string) (File, error) { + return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0) +} + +func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) { + node := fs.dirnode.lookupPath(name) + if node == nil { + err = os.ErrNotExist + } else { + fi = node.Stat() } - var err error - if count >= len(ret) { - count = len(ret) - err = io.EOF + return +} + +type inode interface { + Parent() inode + Read([]byte, filenodePtr) (int, filenodePtr, error) + Write([]byte, filenodePtr) (int, filenodePtr, error) + Truncate(int64) error + Readdir() []os.FileInfo + Size() int64 + Stat() os.FileInfo + sync.Locker + RLock() + RUnlock() +} + +// filenode implements inode. +type filenode struct { + fileinfo fileinfo + parent *dirnode + segments []segment + // number of times `segments` has changed in a + // way that might invalidate a filenodePtr + repacked int64 + memsize int64 // bytes in memSegments + sync.RWMutex +} + +// filenodePtr is an offset into a file that is (usually) efficient to +// seek to. Specifically, if filenode.repacked==filenodePtr.repacked +// then +// filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff] +// corresponds to file offset filenodePtr.off. Otherwise, it is +// necessary to reexamine len(filenode.segments[0]) etc. to find the +// correct segment and offset. +type filenodePtr struct { + off int64 + segmentIdx int + segmentOff int + repacked int64 +} + +// seek returns a ptr that is consistent with both startPtr.off and +// the current state of fn. The caller must already hold fn.RLock() or +// fn.Lock(). +// +// If startPtr is beyond EOF, ptr.segment* will indicate precisely +// EOF. +// +// After seeking: +// +// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF +// || +// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff +func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) { + ptr = startPtr + if ptr.off < 0 { + // meaningless anyway + return + } else if ptr.off >= fn.fileinfo.size { + ptr.segmentIdx = len(fn.segments) + ptr.segmentOff = 0 + ptr.repacked = fn.repacked + return + } else if ptr.repacked == fn.repacked { + // segmentIdx and segmentOff accurately reflect + // ptr.off, but might have fallen off the end of a + // segment + if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() { + ptr.segmentIdx++ + ptr.segmentOff = 0 + } + return } - cd.dirents = cd.dirents[count:] - return ret[:count], err + defer func() { + ptr.repacked = fn.repacked + }() + if ptr.off >= fn.fileinfo.size { + ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0 + return + } + // Recompute segmentIdx and segmentOff. We have already + // established fn.fileinfo.size > ptr.off >= 0, so we don't + // have to deal with edge cases here. + var off int64 + for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ { + // This would panic (index out of range) if + // fn.fileinfo.size were larger than + // sum(fn.segments[i].Len()) -- but that can't happen + // because we have ensured fn.fileinfo.size is always + // accurate. + segLen := int64(fn.segments[ptr.segmentIdx].Len()) + if off+segLen > ptr.off { + ptr.segmentOff = int(ptr.off - off) + break + } + off += segLen + } + return } -// Stat implements os.File. -func (cd *collectionDir) Stat() (os.FileInfo, error) { - return collectionDirent{ - collection: cd.collection, - name: path.Base(cd.stream), - isDir: true, - size: int64(len(cd.dirents)), - }, nil +// caller must have lock +func (fn *filenode) appendSegment(e segment) { + fn.segments = append(fn.segments, e) + fn.fileinfo.size += int64(e.Len()) +} + +func (fn *filenode) Parent() inode { + fn.RLock() + defer fn.RUnlock() + return fn.parent } -// Close implements os.File. -func (cd *collectionDir) Close() error { +func (fn *filenode) Readdir() []os.FileInfo { return nil } -// Read implements os.File. -func (cd *collectionDir) Read([]byte) (int, error) { - return 0, nil +// Read reads file data from a single segment, starting at startPtr, +// into p. startPtr is assumed not to be up-to-date. Caller must have +// RLock or Lock. +func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) { + ptr = fn.seek(startPtr) + if ptr.off < 0 { + err = ErrNegativeOffset + return + } + if ptr.segmentIdx >= len(fn.segments) { + err = io.EOF + return + } + n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff)) + if n > 0 { + ptr.off += int64(n) + ptr.segmentOff += n + if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() { + ptr.segmentIdx++ + ptr.segmentOff = 0 + if ptr.segmentIdx < len(fn.segments) && err == io.EOF { + err = nil + } + } + } + return } -// Seek implements os.File. -func (cd *collectionDir) Seek(int64, int) (int64, error) { - return 0, nil +func (fn *filenode) Size() int64 { + fn.RLock() + defer fn.RUnlock() + return fn.fileinfo.Size() } -// collectionDirent implements os.FileInfo. -type collectionDirent struct { - collection *Collection - name string - isDir bool - mode os.FileMode - size int64 +func (fn *filenode) Stat() os.FileInfo { + fn.RLock() + defer fn.RUnlock() + return fn.fileinfo } -// Name implements os.FileInfo. -func (e collectionDirent) Name() string { - return e.name +func (fn *filenode) Truncate(size int64) error { + fn.Lock() + defer fn.Unlock() + return fn.truncate(size) } -// ModTime implements os.FileInfo. -func (e collectionDirent) ModTime() time.Time { - if e.collection.ModifiedAt == nil { - return time.Now() +func (fn *filenode) truncate(size int64) error { + if size == fn.fileinfo.size { + return nil + } + fn.repacked++ + if size < fn.fileinfo.size { + ptr := fn.seek(filenodePtr{off: size}) + for i := ptr.segmentIdx; i < len(fn.segments); i++ { + if seg, ok := fn.segments[i].(*memSegment); ok { + fn.memsize -= int64(seg.Len()) + } + } + if ptr.segmentOff == 0 { + fn.segments = fn.segments[:ptr.segmentIdx] + } else { + fn.segments = fn.segments[:ptr.segmentIdx+1] + switch seg := fn.segments[ptr.segmentIdx].(type) { + case *memSegment: + seg.Truncate(ptr.segmentOff) + fn.memsize += int64(seg.Len()) + default: + fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff) + } + } + fn.fileinfo.size = size + return nil } - return *e.collection.ModifiedAt + for size > fn.fileinfo.size { + grow := size - fn.fileinfo.size + var seg *memSegment + var ok bool + if len(fn.segments) == 0 { + seg = &memSegment{} + fn.segments = append(fn.segments, seg) + } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize { + seg = &memSegment{} + fn.segments = append(fn.segments, seg) + } + if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow { + grow = maxgrow + } + seg.Truncate(seg.Len() + int(grow)) + fn.fileinfo.size += grow + fn.memsize += grow + } + return nil } -// Mode implements os.FileInfo. -func (e collectionDirent) Mode() os.FileMode { - if e.isDir { - return 0555 +// Write writes data from p to the file, starting at startPtr, +// extending the file size if necessary. Caller must have Lock. +func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) { + if startPtr.off > fn.fileinfo.size { + if err = fn.truncate(startPtr.off); err != nil { + return 0, startPtr, err + } + } + ptr = fn.seek(startPtr) + if ptr.off < 0 { + err = ErrNegativeOffset + return + } + for len(p) > 0 && err == nil { + cando := p + if len(cando) > maxBlockSize { + cando = cando[:maxBlockSize] + } + // Rearrange/grow fn.segments (and shrink cando if + // needed) such that cando can be copied to + // fn.segments[ptr.segmentIdx] at offset + // ptr.segmentOff. + cur := ptr.segmentIdx + prev := ptr.segmentIdx - 1 + var curWritable bool + if cur < len(fn.segments) { + _, curWritable = fn.segments[cur].(*memSegment) + } + var prevAppendable bool + if prev >= 0 && fn.segments[prev].Len() < maxBlockSize { + _, prevAppendable = fn.segments[prev].(*memSegment) + } + if ptr.segmentOff > 0 && !curWritable { + // Split a non-writable block. + if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) { + // Truncate cur, and insert a new + // segment after it. + cando = cando[:max] + fn.segments = append(fn.segments, nil) + copy(fn.segments[cur+1:], fn.segments[cur:]) + } else { + // Split cur into two copies, truncate + // the one on the left, shift the one + // on the right, and insert a new + // segment between them. + fn.segments = append(fn.segments, nil, nil) + copy(fn.segments[cur+2:], fn.segments[cur:]) + fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1) + } + cur++ + prev++ + seg := &memSegment{} + seg.Truncate(len(cando)) + fn.memsize += int64(len(cando)) + fn.segments[cur] = seg + fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff) + ptr.segmentIdx++ + ptr.segmentOff = 0 + fn.repacked++ + ptr.repacked++ + } else if curWritable { + if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) { + cando = cando[:fit] + } + } else { + if prevAppendable { + // Shrink cando if needed to fit in + // prev segment. + if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) { + cando = cando[:cangrow] + } + } + + if cur == len(fn.segments) { + // ptr is at EOF, filesize is changing. + fn.fileinfo.size += int64(len(cando)) + } else if el := fn.segments[cur].Len(); el <= len(cando) { + // cando is long enough that we won't + // need cur any more. shrink cando to + // be exactly as long as cur + // (otherwise we'd accidentally shift + // the effective position of all + // segments after cur). + cando = cando[:el] + copy(fn.segments[cur:], fn.segments[cur+1:]) + fn.segments = fn.segments[:len(fn.segments)-1] + } else { + // shrink cur by the same #bytes we're growing prev + fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1) + } + + if prevAppendable { + // Grow prev. + ptr.segmentIdx-- + ptr.segmentOff = fn.segments[prev].Len() + fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando)) + fn.memsize += int64(len(cando)) + ptr.repacked++ + fn.repacked++ + } else { + // Insert a segment between prev and + // cur, and advance prev/cur. + fn.segments = append(fn.segments, nil) + if cur < len(fn.segments) { + copy(fn.segments[cur+1:], fn.segments[cur:]) + ptr.repacked++ + fn.repacked++ + } else { + // appending a new segment does + // not invalidate any ptrs + } + seg := &memSegment{} + seg.Truncate(len(cando)) + fn.memsize += int64(len(cando)) + fn.segments[cur] = seg + cur++ + prev++ + } + } + + // Finally we can copy bytes from cando to the current segment. + fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff) + n += len(cando) + p = p[len(cando):] + + ptr.off += int64(len(cando)) + ptr.segmentOff += len(cando) + if ptr.segmentOff >= maxBlockSize { + fn.pruneMemSegments() + } + if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff { + ptr.segmentOff = 0 + ptr.segmentIdx++ + } + + fn.fileinfo.modTime = time.Now() + } + return +} + +// Write some data out to disk to reduce memory use. Caller must have +// write lock. +func (fn *filenode) pruneMemSegments() { + // TODO: async (don't hold Lock() while waiting for Keep) + // TODO: share code with (*dirnode)sync() + // TODO: pack/flush small blocks too, when fragmented + for idx, seg := range fn.segments { + seg, ok := seg.(*memSegment) + if !ok || seg.Len() < maxBlockSize { + continue + } + locator, _, err := fn.parent.kc.PutB(seg.buf) + if err != nil { + // TODO: stall (or return errors from) + // subsequent writes until flushing + // starts to succeed + continue + } + fn.memsize -= int64(seg.Len()) + fn.segments[idx] = storedSegment{ + kc: fn.parent.kc, + locator: locator, + size: seg.Len(), + offset: 0, + length: seg.Len(), + } + } +} + +// FileSystem returns a CollectionFileSystem for the collection. +func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) { + var modTime time.Time + if c.ModifiedAt == nil { + modTime = time.Now() } else { - return 0444 + modTime = *c.ModifiedAt + } + fs := &fileSystem{dirnode: dirnode{ + client: client, + kc: kc, + fileinfo: fileinfo{ + name: ".", + mode: os.ModeDir | 0755, + modTime: modTime, + }, + parent: nil, + inodes: make(map[string]inode), + }} + fs.dirnode.parent = &fs.dirnode + if err := fs.dirnode.loadManifest(c.ManifestText); err != nil { + return nil, err } + return fs, nil } -// IsDir implements os.FileInfo. -func (e collectionDirent) IsDir() bool { - return e.isDir +type filehandle struct { + inode + ptr filenodePtr + append bool + readable bool + writable bool + unreaddirs []os.FileInfo } -// Size implements os.FileInfo. -func (e collectionDirent) Size() int64 { - return e.size +func (f *filehandle) Read(p []byte) (n int, err error) { + if !f.readable { + return 0, ErrWriteOnlyMode + } + f.inode.RLock() + defer f.inode.RUnlock() + n, f.ptr, err = f.inode.Read(p, f.ptr) + return } -// Sys implements os.FileInfo. -func (e collectionDirent) Sys() interface{} { - return nil +func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) { + size := f.inode.Size() + ptr := f.ptr + switch whence { + case io.SeekStart: + ptr.off = off + case io.SeekCurrent: + ptr.off += off + case io.SeekEnd: + ptr.off = size + off + } + if ptr.off < 0 { + return f.ptr.off, ErrNegativeOffset + } + if ptr.off != f.ptr.off { + f.ptr = ptr + // force filenode to recompute f.ptr fields on next + // use + f.ptr.repacked = -1 + } + return f.ptr.off, nil } -// A CollectionFileSystem is an http.Filesystem with an added Stat() method. -type CollectionFileSystem interface { - http.FileSystem - Stat(name string) (os.FileInfo, error) +func (f *filehandle) Truncate(size int64) error { + return f.inode.Truncate(size) } -// collectionFS implements CollectionFileSystem. -type collectionFS struct { - collection *Collection - client *Client - kc keepClient - sizes map[string]int64 - sizesOnce sync.Once +func (f *filehandle) Write(p []byte) (n int, err error) { + if !f.writable { + return 0, ErrReadOnlyFile + } + f.inode.Lock() + defer f.inode.Unlock() + if fn, ok := f.inode.(*filenode); ok && f.append { + f.ptr = filenodePtr{ + off: fn.fileinfo.size, + segmentIdx: len(fn.segments), + segmentOff: 0, + repacked: fn.repacked, + } + } + n, f.ptr, err = f.inode.Write(p, f.ptr) + return } -// FileSystem returns a CollectionFileSystem for the collection. -func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem { - return &collectionFS{ - collection: c, - client: client, - kc: kc, - } -} - -func (c *collectionFS) Stat(name string) (os.FileInfo, error) { - name = canonicalName(name) - if name == "." { - return collectionDirent{ - collection: c.collection, - name: "/", - isDir: true, - }, nil - } - if size, ok := c.fileSizes()[name]; ok { - return collectionDirent{ - collection: c.collection, - name: path.Base(name), - size: size, - isDir: false, - }, nil - } - for fnm := range c.fileSizes() { - if !strings.HasPrefix(fnm, name+"/") { +func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) { + if !f.inode.Stat().IsDir() { + return nil, ErrInvalidOperation + } + if count <= 0 { + return f.inode.Readdir(), nil + } + if f.unreaddirs == nil { + f.unreaddirs = f.inode.Readdir() + } + if len(f.unreaddirs) == 0 { + return nil, io.EOF + } + if count > len(f.unreaddirs) { + count = len(f.unreaddirs) + } + ret := f.unreaddirs[:count] + f.unreaddirs = f.unreaddirs[count:] + return ret, nil +} + +func (f *filehandle) Stat() (os.FileInfo, error) { + return f.inode.Stat(), nil +} + +func (f *filehandle) Close() error { + return nil +} + +type dirnode struct { + fileinfo fileinfo + parent *dirnode + client *Client + kc keepClient + inodes map[string]inode + sync.RWMutex +} + +// sync flushes in-memory data (for all files in the tree rooted at +// dn) to persistent storage. Caller must hold dn.Lock(). +func (dn *dirnode) sync() error { + type shortBlock struct { + fn *filenode + idx int + } + var pending []shortBlock + var pendingLen int + + flush := func(sbs []shortBlock) error { + if len(sbs) == 0 { + return nil + } + block := make([]byte, 0, maxBlockSize) + for _, sb := range sbs { + block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...) + } + locator, _, err := dn.kc.PutB(block) + if err != nil { + return err + } + off := 0 + for _, sb := range sbs { + data := sb.fn.segments[sb.idx].(*memSegment).buf + sb.fn.segments[sb.idx] = storedSegment{ + kc: dn.kc, + locator: locator, + size: len(block), + offset: off, + length: len(data), + } + off += len(data) + sb.fn.memsize -= int64(len(data)) + } + return nil + } + + names := make([]string, 0, len(dn.inodes)) + for name := range dn.inodes { + names = append(names, name) + } + sort.Strings(names) + + for _, name := range names { + fn, ok := dn.inodes[name].(*filenode) + if !ok { continue } - return collectionDirent{ - collection: c.collection, - name: path.Base(name), - isDir: true, - }, nil + fn.Lock() + defer fn.Unlock() + for idx, seg := range fn.segments { + seg, ok := seg.(*memSegment) + if !ok { + continue + } + if seg.Len() > maxBlockSize/2 { + if err := flush([]shortBlock{{fn, idx}}); err != nil { + return err + } + continue + } + if pendingLen+seg.Len() > maxBlockSize { + if err := flush(pending); err != nil { + return err + } + pending = nil + pendingLen = 0 + } + pending = append(pending, shortBlock{fn, idx}) + pendingLen += seg.Len() + } } - return nil, os.ErrNotExist + return flush(pending) } -func (c *collectionFS) Open(name string) (http.File, error) { - // Ensure name looks the way it does in a manifest. - name = canonicalName(name) +func (dn *dirnode) MarshalManifest(prefix string) (string, error) { + dn.Lock() + defer dn.Unlock() + return dn.marshalManifest(prefix) +} - m := manifest.Manifest{Text: c.collection.ManifestText} +// caller must have read lock. +func (dn *dirnode) marshalManifest(prefix string) (string, error) { + var streamLen int64 + type filepart struct { + name string + offset int64 + length int64 + } + var fileparts []filepart + var subdirs string + var blocks []string - // Return a file if it exists. - if size, ok := c.fileSizes()[name]; ok { - reader, err := c.kc.ManifestFileReader(m, name) - if err != nil { - return nil, err + if err := dn.sync(); err != nil { + return "", err + } + + names := make([]string, 0, len(dn.inodes)) + for name, node := range dn.inodes { + names = append(names, name) + node.Lock() + defer node.Unlock() + } + sort.Strings(names) + + for _, name := range names { + switch node := dn.inodes[name].(type) { + case *dirnode: + subdir, err := node.marshalManifest(prefix + "/" + name) + if err != nil { + return "", err + } + subdirs = subdirs + subdir + case *filenode: + if len(node.segments) == 0 { + fileparts = append(fileparts, filepart{name: name}) + break + } + for _, seg := range node.segments { + switch seg := seg.(type) { + case storedSegment: + if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator { + streamLen -= int64(seg.size) + } else { + blocks = append(blocks, seg.locator) + } + next := filepart{ + name: name, + offset: streamLen + int64(seg.offset), + length: int64(seg.length), + } + if prev := len(fileparts) - 1; prev >= 0 && + fileparts[prev].name == name && + fileparts[prev].offset+fileparts[prev].length == next.offset { + fileparts[prev].length += next.length + } else { + fileparts = append(fileparts, next) + } + streamLen += int64(seg.size) + default: + // This can't happen: we + // haven't unlocked since + // calling sync(). + panic(fmt.Sprintf("can't marshal segment type %T", seg)) + } + } + default: + panic(fmt.Sprintf("can't marshal inode type %T", node)) + } + } + var filetokens []string + for _, s := range fileparts { + filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name))) + } + if len(filetokens) == 0 { + return subdirs, nil + } else if len(blocks) == 0 { + blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"} + } + return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil +} + +func (dn *dirnode) loadManifest(txt string) error { + var dirname string + streams := strings.Split(txt, "\n") + if streams[len(streams)-1] != "" { + return fmt.Errorf("line %d: no trailing newline", len(streams)) + } + streams = streams[:len(streams)-1] + segments := []storedSegment{} + for i, stream := range streams { + lineno := i + 1 + var anyFileTokens bool + var pos int64 + var segIdx int + segments = segments[:0] + for i, token := range strings.Split(stream, " ") { + if i == 0 { + dirname = manifestUnescape(token) + continue + } + if !strings.Contains(token, ":") { + if anyFileTokens { + return fmt.Errorf("line %d: bad file segment %q", lineno, token) + } + toks := strings.SplitN(token, "+", 3) + if len(toks) < 2 { + return fmt.Errorf("line %d: bad locator %q", lineno, token) + } + length, err := strconv.ParseInt(toks[1], 10, 32) + if err != nil || length < 0 { + return fmt.Errorf("line %d: bad locator %q", lineno, token) + } + segments = append(segments, storedSegment{ + locator: token, + size: int(length), + offset: 0, + length: int(length), + }) + continue + } else if len(segments) == 0 { + return fmt.Errorf("line %d: bad locator %q", lineno, token) + } + + toks := strings.Split(token, ":") + if len(toks) != 3 { + return fmt.Errorf("line %d: bad file segment %q", lineno, token) + } + anyFileTokens = true + + offset, err := strconv.ParseInt(toks[0], 10, 64) + if err != nil || offset < 0 { + return fmt.Errorf("line %d: bad file segment %q", lineno, token) + } + length, err := strconv.ParseInt(toks[1], 10, 64) + if err != nil || length < 0 { + return fmt.Errorf("line %d: bad file segment %q", lineno, token) + } + name := dirname + "/" + manifestUnescape(toks[2]) + fnode, err := dn.createFileAndParents(name) + if err != nil { + return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err) + } + // Map the stream offset/range coordinates to + // block/offset/range coordinates and add + // corresponding storedSegments to the filenode + if pos > offset { + // Can't continue where we left off. + // TODO: binary search instead of + // rewinding all the way (but this + // situation might be rare anyway) + segIdx, pos = 0, 0 + } + for next := int64(0); segIdx < len(segments); segIdx++ { + seg := segments[segIdx] + next = pos + int64(seg.Len()) + if next <= offset || seg.Len() == 0 { + pos = next + continue + } + if pos >= offset+length { + break + } + var blkOff int + if pos < offset { + blkOff = int(offset - pos) + } + blkLen := seg.Len() - blkOff + if pos+int64(blkOff+blkLen) > offset+length { + blkLen = int(offset + length - pos - int64(blkOff)) + } + fnode.appendSegment(storedSegment{ + kc: dn.kc, + locator: seg.locator, + size: seg.size, + offset: blkOff, + length: blkLen, + }) + if next > offset+length { + break + } else { + pos = next + } + } + if segIdx == len(segments) && pos < offset+length { + return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token) + } + } + if !anyFileTokens { + return fmt.Errorf("line %d: no file segments", lineno) + } else if len(segments) == 0 { + return fmt.Errorf("line %d: no locators", lineno) + } else if dirname == "" { + return fmt.Errorf("line %d: no stream name", lineno) + } + } + return nil +} + +// only safe to call from loadManifest -- no locking +func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) { + names := strings.Split(path, "/") + basename := names[len(names)-1] + if basename == "" || basename == "." || basename == ".." { + err = fmt.Errorf("invalid filename") + return + } + for _, name := range names[:len(names)-1] { + switch name { + case "", ".": + case "..": + dn = dn.parent + default: + switch node := dn.inodes[name].(type) { + case nil: + dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime) + case *dirnode: + dn = node + case *filenode: + err = ErrFileExists + return + } + } + } + switch node := dn.inodes[basename].(type) { + case nil: + fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime) + case *filenode: + fn = node + case *dirnode: + err = ErrIsDirectory + } + return +} + +func (dn *dirnode) mkdir(name string) (*filehandle, error) { + return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755) +} + +func (dn *dirnode) Mkdir(name string, perm os.FileMode) error { + f, err := dn.mkdir(name) + if err == nil { + err = f.Close() + } + return err +} + +func (dn *dirnode) Remove(name string) error { + return dn.remove(strings.TrimRight(name, "/"), false) +} + +func (dn *dirnode) RemoveAll(name string) error { + err := dn.remove(strings.TrimRight(name, "/"), true) + if os.IsNotExist(err) { + // "If the path does not exist, RemoveAll returns + // nil." (see "os" pkg) + err = nil + } + return err +} + +func (dn *dirnode) remove(name string, recursive bool) error { + dirname, name := path.Split(name) + if name == "" || name == "." || name == ".." { + return ErrInvalidArgument + } + dn, ok := dn.lookupPath(dirname).(*dirnode) + if !ok { + return os.ErrNotExist + } + dn.Lock() + defer dn.Unlock() + switch node := dn.inodes[name].(type) { + case nil: + return os.ErrNotExist + case *dirnode: + node.RLock() + defer node.RUnlock() + if !recursive && len(node.inodes) > 0 { + return ErrDirectoryNotEmpty } - return &collectionFile{ - File: reader, - collection: c.collection, - name: path.Base(name), - size: size, - }, nil } + delete(dn.inodes, name) + return nil +} + +func (dn *dirnode) Rename(oldname, newname string) error { + olddir, oldname := path.Split(oldname) + if oldname == "" || oldname == "." || oldname == ".." { + return ErrInvalidArgument + } + olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0) + if err != nil { + return fmt.Errorf("%q: %s", olddir, err) + } + defer olddirf.Close() + newdir, newname := path.Split(newname) + if newname == "." || newname == ".." { + return ErrInvalidArgument + } else if newname == "" { + // Rename("a/b", "c/") means Rename("a/b", "c/b") + newname = oldname + } + newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0) + if err != nil { + return fmt.Errorf("%q: %s", newdir, err) + } + defer newdirf.Close() + + // When acquiring locks on multiple nodes, all common + // ancestors must be locked first in order to avoid + // deadlock. This is assured by locking the path from root to + // newdir, then locking the path from root to olddir, skipping + // any already-locked nodes. + needLock := []sync.Locker{} + for _, f := range []*filehandle{olddirf, newdirf} { + node := f.inode + needLock = append(needLock, node) + for node.Parent() != node { + node = node.Parent() + needLock = append(needLock, node) + } + } + locked := map[sync.Locker]bool{} + for i := len(needLock) - 1; i >= 0; i-- { + if n := needLock[i]; !locked[n] { + n.Lock() + defer n.Unlock() + locked[n] = true + } + } + + olddn := olddirf.inode.(*dirnode) + newdn := newdirf.inode.(*dirnode) + oldinode, ok := olddn.inodes[oldname] + if !ok { + return os.ErrNotExist + } + if existing, ok := newdn.inodes[newname]; ok { + // overwriting an existing file or dir + if dn, ok := existing.(*dirnode); ok { + if !oldinode.Stat().IsDir() { + return ErrIsDirectory + } + dn.RLock() + defer dn.RUnlock() + if len(dn.inodes) > 0 { + return ErrDirectoryNotEmpty + } + } + } else { + if newdn.inodes == nil { + newdn.inodes = make(map[string]inode) + } + newdn.fileinfo.size++ + } + newdn.inodes[newname] = oldinode + switch n := oldinode.(type) { + case *dirnode: + n.parent = newdn + case *filenode: + n.parent = newdn + default: + panic(fmt.Sprintf("bad inode type %T", n)) + } + delete(olddn.inodes, oldname) + olddn.fileinfo.size-- + return nil +} + +func (dn *dirnode) Parent() inode { + dn.RLock() + defer dn.RUnlock() + return dn.parent +} + +func (dn *dirnode) Readdir() (fi []os.FileInfo) { + dn.RLock() + defer dn.RUnlock() + fi = make([]os.FileInfo, 0, len(dn.inodes)) + for _, inode := range dn.inodes { + fi = append(fi, inode.Stat()) + } + return +} - // Return a directory if it's the root dir or there are file - // entries below it. - children := map[string]collectionDirent{} - for fnm, size := range c.fileSizes() { - if !strings.HasPrefix(fnm, name+"/") { +func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) { + return 0, ptr, ErrInvalidOperation +} + +func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) { + return 0, ptr, ErrInvalidOperation +} + +func (dn *dirnode) Size() int64 { + dn.RLock() + defer dn.RUnlock() + return dn.fileinfo.Size() +} + +func (dn *dirnode) Stat() os.FileInfo { + dn.RLock() + defer dn.RUnlock() + return dn.fileinfo +} + +func (dn *dirnode) Truncate(int64) error { + return ErrInvalidOperation +} + +// lookupPath returns the inode for the file/directory with the given +// name (which may contain "/" separators), along with its parent +// node. If no such file/directory exists, the returned node is nil. +func (dn *dirnode) lookupPath(path string) (node inode) { + node = dn + for _, name := range strings.Split(path, "/") { + dn, ok := node.(*dirnode) + if !ok { + return nil + } + if name == "." || name == "" { continue } - isDir := false - ent := fnm[len(name)+1:] - if i := strings.Index(ent, "/"); i >= 0 { - ent = ent[:i] - isDir = true + if name == ".." { + node = node.Parent() + continue } - e := children[ent] - e.collection = c.collection - e.isDir = isDir - e.name = ent - e.size = size - children[ent] = e + dn.RLock() + node = dn.inodes[name] + dn.RUnlock() + } + return +} + +func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode { + child := &dirnode{ + parent: dn, + client: dn.client, + kc: dn.kc, + fileinfo: fileinfo{ + name: name, + mode: os.ModeDir | perm, + modTime: modTime, + }, + } + if dn.inodes == nil { + dn.inodes = make(map[string]inode) + } + dn.inodes[name] = child + dn.fileinfo.size++ + return child +} + +func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode { + child := &filenode{ + parent: dn, + fileinfo: fileinfo{ + name: name, + mode: perm, + modTime: modTime, + }, + } + if dn.inodes == nil { + dn.inodes = make(map[string]inode) } - if len(children) == 0 && name != "." { + dn.inodes[name] = child + dn.fileinfo.size++ + return child +} + +// OpenFile is analogous to os.OpenFile(). +func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) { + if flag&os.O_SYNC != 0 { + return nil, ErrSyncNotSupported + } + dirname, name := path.Split(name) + dn, ok := dn.lookupPath(dirname).(*dirnode) + if !ok { return nil, os.ErrNotExist } - dirents := make([]os.FileInfo, 0, len(children)) - for _, ent := range children { - dirents = append(dirents, ent) + var readable, writable bool + switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) { + case os.O_RDWR: + readable = true + writable = true + case os.O_RDONLY: + readable = true + case os.O_WRONLY: + writable = true + default: + return nil, fmt.Errorf("invalid flags 0x%x", flag) + } + if !writable { + // A directory can be opened via "foo/", "foo/.", or + // "foo/..". + switch name { + case ".", "": + return &filehandle{inode: dn}, nil + case "..": + return &filehandle{inode: dn.Parent()}, nil + } + } + createMode := flag&os.O_CREATE != 0 + if createMode { + dn.Lock() + defer dn.Unlock() + } else { + dn.RLock() + defer dn.RUnlock() } - return &collectionDir{ - collection: c.collection, - stream: name, - dirents: dirents, + n, ok := dn.inodes[name] + if !ok { + if !createMode { + return nil, os.ErrNotExist + } + if perm.IsDir() { + n = dn.newDirnode(name, 0755, time.Now()) + } else { + n = dn.newFilenode(name, 0755, time.Now()) + } + } else if flag&os.O_EXCL != 0 { + return nil, ErrFileExists + } else if flag&os.O_TRUNC != 0 { + if !writable { + return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode") + } else if fn, ok := n.(*filenode); !ok { + return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory") + } else { + fn.Truncate(0) + } + } + return &filehandle{ + inode: n, + append: flag&os.O_APPEND != 0, + readable: readable, + writable: writable, }, nil } -// fileSizes returns a map of files that can be opened. Each key -// starts with "./". -func (c *collectionFS) fileSizes() map[string]int64 { - c.sizesOnce.Do(func() { - c.sizes = map[string]int64{} - m := manifest.Manifest{Text: c.collection.ManifestText} - for ms := range m.StreamIter() { - for _, fss := range ms.FileStreamSegments { - c.sizes[ms.StreamName+"/"+fss.Name] += int64(fss.SegLen) - } +type segment interface { + io.ReaderAt + Len() int + // Return a new segment with a subsection of the data from this + // one. length<0 means length=Len()-off. + Slice(off int, length int) segment +} + +type memSegment struct { + buf []byte +} + +func (me *memSegment) Len() int { + return len(me.buf) +} + +func (me *memSegment) Slice(off, length int) segment { + if length < 0 { + length = len(me.buf) - off + } + buf := make([]byte, length) + copy(buf, me.buf[off:]) + return &memSegment{buf: buf} +} + +func (me *memSegment) Truncate(n int) { + if n > cap(me.buf) { + newsize := 1024 + for newsize < n { + newsize = newsize << 2 + } + newbuf := make([]byte, n, newsize) + copy(newbuf, me.buf) + me.buf = newbuf + } else { + // Zero unused part when shrinking, in case we grow + // and start using it again later. + for i := n; i < len(me.buf); i++ { + me.buf[i] = 0 + } + } + me.buf = me.buf[:n] +} + +func (me *memSegment) WriteAt(p []byte, off int) { + if off+len(p) > len(me.buf) { + panic("overflowed segment") + } + copy(me.buf[off:], p) +} + +func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) { + if off > int64(me.Len()) { + err = io.EOF + return + } + n = copy(p, me.buf[int(off):]) + if n < len(p) { + err = io.EOF + } + return +} + +type storedSegment struct { + kc keepClient + locator string + size int // size of stored block (also encoded in locator) + offset int // position of segment within the stored block + length int // bytes in this segment (offset + length <= size) +} + +func (se storedSegment) Len() int { + return se.length +} + +func (se storedSegment) Slice(n, size int) segment { + se.offset += n + se.length -= n + if size >= 0 && se.length > size { + se.length = size + } + return se +} + +func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) { + if off > int64(se.length) { + return 0, io.EOF + } + maxlen := se.length - int(off) + if len(p) > maxlen { + p = p[:maxlen] + n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset) + if err == nil { + err = io.EOF } - }) - return c.sizes + return + } + return se.kc.ReadAt(se.locator, p, int(off)+se.offset) } func canonicalName(name string) string { @@ -278,3 +1388,31 @@ func canonicalName(name string) string { } return name } + +var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`) + +func manifestUnescapeFunc(seq string) string { + if seq == `\\` { + return `\` + } + i, err := strconv.ParseUint(seq[1:], 8, 8) + if err != nil { + // Invalid escape sequence: can't unescape. + return seq + } + return string([]byte{byte(i)}) +} + +func manifestUnescape(s string) string { + return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc) +} + +var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`) + +func manifestEscapeFunc(seq string) string { + return fmt.Sprintf("\\%03o", byte(seq[0])) +} + +func manifestEscape(s string) string { + return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc) +} diff --git a/sdk/go/arvados/collection_fs_test.go b/sdk/go/arvados/collection_fs_test.go index f51d1eb3dd..f1a34754f7 100644 --- a/sdk/go/arvados/collection_fs_test.go +++ b/sdk/go/arvados/collection_fs_test.go @@ -5,10 +5,20 @@ package arvados import ( + "bytes" + "crypto/md5" + "errors" + "fmt" "io" + "io/ioutil" + "math/rand" "net/http" "os" + "regexp" + "runtime" + "sync" "testing" + "time" "git.curoverse.com/arvados.git/sdk/go/arvadostest" check "gopkg.in/check.v1" @@ -16,17 +26,55 @@ import ( var _ = check.Suite(&CollectionFSSuite{}) +type keepClientStub struct { + blocks map[string][]byte + sync.RWMutex +} + +var errStub404 = errors.New("404 block not found") + +func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) { + kcs.RLock() + defer kcs.RUnlock() + buf := kcs.blocks[locator[:32]] + if buf == nil { + return 0, errStub404 + } + return copy(p, buf[off:]), nil +} + +func (kcs *keepClientStub) PutB(p []byte) (string, int, error) { + locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p)) + buf := make([]byte, len(p)) + copy(buf, p) + kcs.Lock() + defer kcs.Unlock() + kcs.blocks[locator[:32]] = buf + return locator, 1, nil +} + type CollectionFSSuite struct { client *Client coll Collection - fs http.FileSystem + fs CollectionFileSystem + kc keepClient } func (s *CollectionFSSuite) SetUpTest(c *check.C) { s.client = NewClientFromEnv() err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil) c.Assert(err, check.IsNil) - s.fs = s.coll.FileSystem(s.client, nil) + s.kc = &keepClientStub{ + blocks: map[string][]byte{ + "3858f62230ac3c915f300c664312c63f": []byte("foobar"), + }} + s.fs, err = s.coll.FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) +} + +func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) { + _, ok := s.fs.(http.FileSystem) + c.Check(ok, check.Equals, true) } func (s *CollectionFSSuite) TestReaddirFull(c *check.C) { @@ -58,7 +106,7 @@ func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) { } fis, err = f.Readdir(1) - c.Check(err, check.Equals, io.EOF) + c.Check(err, check.IsNil) c.Check(len(fis), check.Equals, 1) if len(fis) > 0 { c.Check(fis[0].Size(), check.Equals, int64(3)) @@ -76,7 +124,7 @@ func (s *CollectionFSSuite) TestReaddirLimited(c *check.C) { c.Assert(err, check.IsNil) fis, err = f.Readdir(2) c.Check(len(fis), check.Equals, 1) - c.Assert(err, check.Equals, io.EOF) + c.Assert(err, check.IsNil) fis, err = f.Readdir(2) c.Check(len(fis), check.Equals, 0) c.Assert(err, check.Equals, io.EOF) @@ -113,14 +161,856 @@ func (s *CollectionFSSuite) TestNotExist(c *check.C) { } } -func (s *CollectionFSSuite) TestOpenFile(c *check.C) { - c.Skip("cannot test files with nil keepclient") +func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) { + f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0) + c.Assert(err, check.IsNil) + st, err := f.Stat() + c.Assert(err, check.IsNil) + c.Check(st.Size(), check.Equals, int64(3)) + n, err := f.Write([]byte("bar")) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, ErrReadOnlyFile) +} + +func (s *CollectionFSSuite) TestCreateFile(c *check.C) { + f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0) + c.Assert(err, check.IsNil) + st, err := f.Stat() + c.Assert(err, check.IsNil) + c.Check(st.Size(), check.Equals, int64(0)) + + n, err := f.Write([]byte("bar")) + c.Check(n, check.Equals, 3) + c.Check(err, check.IsNil) + + c.Check(f.Close(), check.IsNil) + + f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0) + c.Check(f, check.IsNil) + c.Assert(err, check.NotNil) + + f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0) + c.Assert(err, check.IsNil) + st, err = f.Stat() + c.Assert(err, check.IsNil) + c.Check(st.Size(), check.Equals, int64(3)) + + c.Check(f.Close(), check.IsNil) + + m, err := s.fs.MarshalManifest(".") + c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`) +} + +func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) { + maxBlockSize = 8 + defer func() { maxBlockSize = 2 << 26 }() - f, err := s.fs.Open("/foo.txt") + f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0) c.Assert(err, check.IsNil) + defer f.Close() st, err := f.Stat() c.Assert(err, check.IsNil) c.Check(st.Size(), check.Equals, int64(3)) + + f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0) + c.Assert(err, check.IsNil) + defer f2.Close() + + buf := make([]byte, 64) + n, err := f.Read(buf) + c.Check(n, check.Equals, 3) + c.Check(err, check.Equals, io.EOF) + c.Check(string(buf[:3]), check.DeepEquals, "foo") + + pos, err := f.Seek(-2, io.SeekCurrent) + c.Check(pos, check.Equals, int64(1)) + c.Check(err, check.IsNil) + + // Split a storedExtent in two, and insert a memExtent + n, err = f.Write([]byte("*")) + c.Check(n, check.Equals, 1) + c.Check(err, check.IsNil) + + pos, err = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(2)) + c.Check(err, check.IsNil) + + pos, err = f.Seek(0, io.SeekStart) + c.Check(pos, check.Equals, int64(0)) + c.Check(err, check.IsNil) + + rbuf, err := ioutil.ReadAll(f) + c.Check(len(rbuf), check.Equals, 3) + c.Check(err, check.IsNil) + c.Check(string(rbuf), check.Equals, "f*o") + + // Write multiple blocks in one call + f.Seek(1, io.SeekStart) + n, err = f.Write([]byte("0123456789abcdefg")) + c.Check(n, check.Equals, 17) + c.Check(err, check.IsNil) + pos, err = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(18)) + pos, err = f.Seek(-18, io.SeekCurrent) + c.Check(err, check.IsNil) + n, err = io.ReadFull(f, buf) + c.Check(n, check.Equals, 18) + c.Check(err, check.Equals, io.ErrUnexpectedEOF) + c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg") + + buf2, err := ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(string(buf2), check.Equals, "f0123456789abcdefg") + + // truncate to current size + err = f.Truncate(18) + f2.Seek(0, io.SeekStart) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(string(buf2), check.Equals, "f0123456789abcdefg") + + // shrink to zero some data + f.Truncate(15) + f2.Seek(0, io.SeekStart) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(string(buf2), check.Equals, "f0123456789abcd") + + // grow to partial block/extent + f.Truncate(20) + f2.Seek(0, io.SeekStart) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00") + + f.Truncate(0) + f2.Seek(0, io.SeekStart) + f2.Write([]byte("12345678abcdefghijkl")) + + // grow to block/extent boundary + f.Truncate(64) + f2.Seek(0, io.SeekStart) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(len(buf2), check.Equals, 64) + c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8) + + // shrink to block/extent boundary + err = f.Truncate(32) + f2.Seek(0, io.SeekStart) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(len(buf2), check.Equals, 32) + c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4) + + // shrink to partial block/extent + err = f.Truncate(15) + f2.Seek(0, io.SeekStart) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(string(buf2), check.Equals, "12345678abcdefg") + c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2) + + // Force flush to ensure the block "12345678" gets stored, so + // we know what to expect in the final manifest below. + _, err = s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + + // Truncate to size=3 while f2's ptr is at 15 + err = f.Truncate(3) + c.Check(err, check.IsNil) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(string(buf2), check.Equals, "") + f2.Seek(0, io.SeekStart) + buf2, err = ioutil.ReadAll(f2) + c.Check(err, check.IsNil) + c.Check(string(buf2), check.Equals, "123") + c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1) + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "") + c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n") +} + +func (s *CollectionFSSuite) TestSeekSparse(c *check.C) { + fs, err := (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755) + c.Assert(err, check.IsNil) + defer f.Close() + + checkSize := func(size int64) { + fi, err := f.Stat() + c.Check(fi.Size(), check.Equals, size) + + f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755) + c.Assert(err, check.IsNil) + defer f.Close() + fi, err = f.Stat() + c.Check(fi.Size(), check.Equals, size) + pos, err := f.Seek(0, io.SeekEnd) + c.Check(pos, check.Equals, size) + } + + f.Seek(2, io.SeekEnd) + checkSize(0) + f.Write([]byte{1}) + checkSize(3) + + f.Seek(2, io.SeekCurrent) + checkSize(3) + f.Write([]byte{}) + checkSize(5) + + f.Seek(8, io.SeekStart) + checkSize(5) + n, err := f.Read(make([]byte, 1)) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) + checkSize(5) + f.Write([]byte{1, 2, 3}) + checkSize(11) +} + +func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) { + maxBlockSize = 8 + defer func() { maxBlockSize = 2 << 26 }() + + var err error + s.fs, err = (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + for _, name := range []string{"foo", "bar", "baz"} { + f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0) + c.Assert(err, check.IsNil) + f.Write([]byte(name)) + f.Close() + } + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "") + c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n") +} + +func (s *CollectionFSSuite) TestMkdir(c *check.C) { + err := s.fs.Mkdir("foo/bar", 0755) + c.Check(err, check.Equals, os.ErrNotExist) + + f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0) + c.Check(err, check.Equals, os.ErrNotExist) + + err = s.fs.Mkdir("foo", 0755) + c.Check(err, check.IsNil) + + f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0) + c.Check(err, check.IsNil) + if err == nil { + defer f.Close() + f.Write([]byte("foo")) + } + + // mkdir fails if a file already exists with that name + err = s.fs.Mkdir("foo/bar", 0755) + c.Check(err, check.NotNil) + + err = s.fs.Remove("foo/bar") + c.Check(err, check.IsNil) + + // mkdir succeds after the file is deleted + err = s.fs.Mkdir("foo/bar", 0755) + c.Check(err, check.IsNil) + + // creating a file in a nonexistent subdir should still fail + f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0) + c.Check(err, check.Equals, os.ErrNotExist) + + f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0) + c.Check(err, check.IsNil) + if err == nil { + defer f.Close() + f.Write([]byte("foo")) + } + + // creating foo/bar as a regular file should fail + f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0) + c.Check(err, check.NotNil) + + // creating foo/bar as a directory should fail + f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir) + c.Check(err, check.NotNil) + err = s.fs.Mkdir("foo/bar", 0755) + c.Check(err, check.NotNil) + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "") + c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n") +} + +func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) { + maxBlockSize = 8 + defer func() { maxBlockSize = 2 << 26 }() + + var wg sync.WaitGroup + for n := 0; n < 128; n++ { + wg.Add(1) + go func() { + defer wg.Done() + f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0) + c.Assert(err, check.IsNil) + defer f.Close() + for i := 0; i < 6502; i++ { + switch rand.Int() & 3 { + case 0: + f.Truncate(int64(rand.Intn(64))) + case 1: + f.Seek(int64(rand.Intn(64)), io.SeekStart) + case 2: + _, err := f.Write([]byte("beep boop")) + c.Check(err, check.IsNil) + case 3: + _, err := ioutil.ReadAll(f) + c.Check(err, check.IsNil) + } + } + }() + } + wg.Wait() + + f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0) + c.Assert(err, check.IsNil) + defer f.Close() + buf, err := ioutil.ReadAll(f) + c.Check(err, check.IsNil) + c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf) +} + +func (s *CollectionFSSuite) TestRandomWrites(c *check.C) { + maxBlockSize = 40 + defer func() { maxBlockSize = 2 << 26 }() + + var err error + s.fs, err = (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + + const nfiles = 256 + const ngoroutines = 256 + + var wg sync.WaitGroup + for n := 0; n < nfiles; n++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + expect := make([]byte, 0, 64) + wbytes := []byte("there's no simple explanation for anything important that any of us do") + f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0) + c.Assert(err, check.IsNil) + defer f.Close() + for i := 0; i < ngoroutines; i++ { + trunc := rand.Intn(65) + woff := rand.Intn(trunc + 1) + wbytes = wbytes[:rand.Intn(64-woff+1)] + for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ { + buf[i] = 0 + } + expect = expect[:trunc] + if trunc < woff+len(wbytes) { + expect = expect[:woff+len(wbytes)] + } + copy(expect[woff:], wbytes) + f.Truncate(int64(trunc)) + pos, err := f.Seek(int64(woff), io.SeekStart) + c.Check(pos, check.Equals, int64(woff)) + c.Check(err, check.IsNil) + n, err := f.Write(wbytes) + c.Check(n, check.Equals, len(wbytes)) + c.Check(err, check.IsNil) + pos, err = f.Seek(0, io.SeekStart) + c.Check(pos, check.Equals, int64(0)) + c.Check(err, check.IsNil) + buf, err := ioutil.ReadAll(f) + c.Check(string(buf), check.Equals, string(expect)) + c.Check(err, check.IsNil) + } + s.checkMemSize(c, f) + }(n) + } + wg.Wait() + + root, err := s.fs.Open("/") + c.Assert(err, check.IsNil) + defer root.Close() + fi, err := root.Readdir(-1) + c.Check(err, check.IsNil) + c.Check(len(fi), check.Equals, nfiles) + + _, err = s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + // TODO: check manifest content +} + +func (s *CollectionFSSuite) TestRemove(c *check.C) { + fs, err := (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + err = fs.Mkdir("dir0", 0755) + c.Assert(err, check.IsNil) + err = fs.Mkdir("dir1", 0755) + c.Assert(err, check.IsNil) + err = fs.Mkdir("dir1/dir2", 0755) + c.Assert(err, check.IsNil) + err = fs.Mkdir("dir1/dir3", 0755) + c.Assert(err, check.IsNil) + + err = fs.Remove("dir0") + c.Check(err, check.IsNil) + err = fs.Remove("dir0") + c.Check(err, check.Equals, os.ErrNotExist) + + err = fs.Remove("dir1/dir2/.") + c.Check(err, check.Equals, ErrInvalidArgument) + err = fs.Remove("dir1/dir2/..") + c.Check(err, check.Equals, ErrInvalidArgument) + err = fs.Remove("dir1") + c.Check(err, check.Equals, ErrDirectoryNotEmpty) + err = fs.Remove("dir1/dir2/../../../dir1") + c.Check(err, check.Equals, ErrDirectoryNotEmpty) + err = fs.Remove("dir1/dir3/") + c.Check(err, check.IsNil) + err = fs.RemoveAll("dir1") + c.Check(err, check.IsNil) + err = fs.RemoveAll("dir1") + c.Check(err, check.IsNil) +} + +func (s *CollectionFSSuite) TestRename(c *check.C) { + fs, err := (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + const ( + outer = 16 + inner = 16 + ) + for i := 0; i < outer; i++ { + err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755) + c.Assert(err, check.IsNil) + for j := 0; j < inner; j++ { + err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755) + c.Assert(err, check.IsNil) + for _, fnm := range []string{ + fmt.Sprintf("dir%d/file%d", i, j), + fmt.Sprintf("dir%d/dir%d/file%d", i, j, j), + } { + f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755) + c.Assert(err, check.IsNil) + _, err = f.Write([]byte("beep")) + c.Assert(err, check.IsNil) + f.Close() + } + } + } + var wg sync.WaitGroup + for i := 0; i < outer; i++ { + for j := 0; j < inner; j++ { + wg.Add(1) + go func(i, j int) { + defer wg.Done() + oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j) + newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1) + _, err := fs.Open(newname) + c.Check(err, check.Equals, os.ErrNotExist) + err = fs.Rename(oldname, newname) + c.Check(err, check.IsNil) + f, err := fs.Open(newname) + c.Check(err, check.IsNil) + f.Close() + }(i, j) + + wg.Add(1) + go func(i, j int) { + defer wg.Done() + // oldname does not exist + err := fs.Rename( + fmt.Sprintf("dir%d/dir%d/missing", i, j), + fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j)) + c.Check(err, check.ErrorMatches, `.*does not exist`) + + // newname parent dir does not exist + err = fs.Rename( + fmt.Sprintf("dir%d/dir%d", i, j), + fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1)) + c.Check(err, check.ErrorMatches, `.*does not exist`) + + // oldname parent dir is a file + err = fs.Rename( + fmt.Sprintf("dir%d/file%d/patherror", i, j), + fmt.Sprintf("dir%d/irrelevant", i)) + c.Check(err, check.ErrorMatches, `.*does not exist`) + + // newname parent dir is a file + err = fs.Rename( + fmt.Sprintf("dir%d/dir%d/file%d", i, j, j), + fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1)) + c.Check(err, check.ErrorMatches, `.*does not exist`) + }(i, j) + } + } + wg.Wait() + + f, err := fs.OpenFile("dir1/newfile3", 0, 0) + c.Assert(err, check.IsNil) + c.Check(f.Size(), check.Equals, int64(4)) + buf, err := ioutil.ReadAll(f) + c.Check(buf, check.DeepEquals, []byte("beep")) + c.Check(err, check.IsNil) + _, err = fs.Open("dir1/dir1/file1") + c.Check(err, check.Equals, os.ErrNotExist) +} + +func (s *CollectionFSSuite) TestPersist(c *check.C) { + maxBlockSize = 1024 + defer func() { maxBlockSize = 2 << 26 }() + + var err error + s.fs, err = (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + err = s.fs.Mkdir("d:r", 0755) + c.Assert(err, check.IsNil) + + expect := map[string][]byte{} + + var wg sync.WaitGroup + for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} { + buf := make([]byte, 500) + rand.Read(buf) + expect[name] = buf + + f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0) + c.Assert(err, check.IsNil) + // Note: we don't close the file until after the test + // is done. Writes to unclosed files should persist. + defer f.Close() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < len(buf); i += 5 { + _, err := f.Write(buf[i : i+5]) + c.Assert(err, check.IsNil) + } + }() + } + wg.Wait() + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + c.Logf("%q", m) + + root, err := s.fs.Open("/") + c.Assert(err, check.IsNil) + defer root.Close() + fi, err := root.Readdir(-1) + c.Check(err, check.IsNil) + c.Check(len(fi), check.Equals, 4) + + persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + + root, err = persisted.Open("/") + c.Assert(err, check.IsNil) + defer root.Close() + fi, err = root.Readdir(-1) + c.Check(err, check.IsNil) + c.Check(len(fi), check.Equals, 4) + + for name, content := range expect { + c.Logf("read %q", name) + f, err := persisted.Open(name) + c.Assert(err, check.IsNil) + defer f.Close() + buf, err := ioutil.ReadAll(f) + c.Check(err, check.IsNil) + c.Check(buf, check.DeepEquals, content) + } +} + +func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) { + var err error + s.fs, err = (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} { + err = s.fs.Mkdir(name, 0755) + c.Assert(err, check.IsNil) + } + + expect := map[string][]byte{ + "0": nil, + "00": []byte{}, + "one": []byte{1}, + "dir/0": nil, + "dir/two": []byte{1, 2}, + "dir/zero": nil, + "dir/zerodir/zero": nil, + "zero/zero/zero": nil, + } + for name, data := range expect { + f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0) + c.Assert(err, check.IsNil) + if data != nil { + _, err := f.Write(data) + c.Assert(err, check.IsNil) + } + f.Close() + } + + m, err := s.fs.MarshalManifest(".") + c.Check(err, check.IsNil) + c.Logf("%q", m) + + persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + + for name, data := range expect { + f, err := persisted.Open("bogus-" + name) + c.Check(err, check.NotNil) + + f, err = persisted.Open(name) + c.Assert(err, check.IsNil) + + if data == nil { + data = []byte{} + } + buf, err := ioutil.ReadAll(f) + c.Check(err, check.IsNil) + c.Check(buf, check.DeepEquals, data) + } +} + +func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) { + fs, err := (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + + f, err := fs.OpenFile("missing", os.O_WRONLY, 0) + c.Check(f, check.IsNil) + c.Check(err, check.ErrorMatches, `file does not exist`) + + f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0) + c.Assert(err, check.IsNil) + defer f.Close() + n, err := f.Write([]byte{1, 2, 3}) + c.Check(n, check.Equals, 0) + c.Check(err, check.ErrorMatches, `read-only file`) + n, err = f.Read(make([]byte, 1)) + c.Check(n, check.Equals, 0) + c.Check(err, check.Equals, io.EOF) + f, err = fs.OpenFile("new", os.O_RDWR, 0) + c.Assert(err, check.IsNil) + defer f.Close() + _, err = f.Write([]byte{4, 5, 6}) + c.Check(err, check.IsNil) + fi, err := f.Stat() + c.Assert(err, check.IsNil) + c.Check(fi.Size(), check.Equals, int64(3)) + + f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0) + c.Assert(err, check.IsNil) + defer f.Close() + pos, err := f.Seek(0, io.SeekEnd) + c.Check(pos, check.Equals, int64(0)) + c.Check(err, check.IsNil) + fi, err = f.Stat() + c.Assert(err, check.IsNil) + c.Check(fi.Size(), check.Equals, int64(0)) + fs.Remove("new") + + buf := make([]byte, 64) + f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0) + c.Assert(err, check.IsNil) + f.Write([]byte{1, 2, 3}) + f.Seek(0, io.SeekStart) + n, _ = f.Read(buf[:1]) + c.Check(n, check.Equals, 1) + c.Check(buf[:1], check.DeepEquals, []byte{1}) + pos, err = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(1)) + f.Write([]byte{4, 5, 6}) + pos, err = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(6)) + f.Seek(0, io.SeekStart) + n, err = f.Read(buf) + c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6}) + c.Check(err, check.Equals, io.EOF) + f.Close() + + f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0) + c.Assert(err, check.IsNil) + pos, err = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(0)) + c.Check(err, check.IsNil) + f.Read(buf[:3]) + pos, _ = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(3)) + f.Write([]byte{7, 8, 9}) + pos, err = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(9)) + f.Close() + + f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0) + c.Assert(err, check.IsNil) + n, err = f.Write([]byte{3, 2, 1}) + c.Check(n, check.Equals, 3) + c.Check(err, check.IsNil) + pos, _ = f.Seek(0, io.SeekCurrent) + c.Check(pos, check.Equals, int64(3)) + pos, _ = f.Seek(0, io.SeekStart) + c.Check(pos, check.Equals, int64(0)) + n, err = f.Read(buf) + c.Check(n, check.Equals, 0) + c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`) + f, err = fs.OpenFile("wronly", os.O_RDONLY, 0) + c.Assert(err, check.IsNil) + n, _ = f.Read(buf) + c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1}) + + f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0) + c.Check(f, check.IsNil) + c.Check(err, check.NotNil) + + f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0) + c.Check(f, check.IsNil) + c.Check(err, check.ErrorMatches, `invalid flag.*`) +} + +func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) { + maxBlockSize = 1024 + defer func() { maxBlockSize = 2 << 26 }() + + fs, err := (&Collection{}).FileSystem(s.client, s.kc) + c.Assert(err, check.IsNil) + f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0) + c.Assert(err, check.IsNil) + defer f.Close() + + data := make([]byte, 500) + rand.Read(data) + + for i := 0; i < 100; i++ { + n, err := f.Write(data) + c.Assert(n, check.Equals, len(data)) + c.Assert(err, check.IsNil) + } + + currentMemExtents := func() (memExtents []int) { + for idx, e := range f.(*filehandle).inode.(*filenode).segments { + switch e.(type) { + case *memSegment: + memExtents = append(memExtents, idx) + } + } + return + } + c.Check(currentMemExtents(), check.HasLen, 1) + + m, err := fs.MarshalManifest(".") + c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`) + c.Check(err, check.IsNil) + c.Check(currentMemExtents(), check.HasLen, 0) +} + +func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) { + for _, txt := range []string{ + "\n", + ".\n", + ". \n", + ". d41d8cd98f00b204e9800998ecf8427e+0\n", + ". d41d8cd98f00b204e9800998ecf8427e+0 \n", + ". 0:0:foo\n", + ". 0:0:foo\n", + ". 0:0:foo 0:0:bar\n", + ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n", + ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo:bar\n", + ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n", + ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n", + ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n", + ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n", + "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n", + } { + c.Logf("<-%q", txt) + fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc) + c.Check(fs, check.IsNil) + c.Logf("-> %s", err) + c.Check(err, check.NotNil) + } +} + +func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) { + for _, txt := range []string{ + "", + ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n", + ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n", + ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n", + ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n", + } { + c.Logf("<-%q", txt) + fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc) + c.Check(err, check.IsNil) + c.Check(fs, check.NotNil) + } +} + +func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) { + fn := f.(*filehandle).inode.(*filenode) + var memsize int64 + for _, seg := range fn.segments { + if e, ok := seg.(*memSegment); ok { + memsize += int64(len(e.buf)) + } + } + c.Check(fn.memsize, check.Equals, memsize) +} + +type CollectionFSUnitSuite struct{} + +var _ = check.Suite(&CollectionFSUnitSuite{}) + +// expect ~2 seconds to load a manifest with 256K files +func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) { + const ( + dirCount = 512 + fileCount = 512 + ) + + mb := bytes.NewBuffer(make([]byte, 0, 40000000)) + for i := 0; i < dirCount; i++ { + fmt.Fprintf(mb, "./dir%d", i) + for j := 0; j <= fileCount; j++ { + fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j) + } + for j := 0; j < fileCount; j++ { + fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j) + } + mb.Write([]byte{'\n'}) + } + coll := Collection{ManifestText: mb.String()} + c.Logf("%s built", time.Now()) + + var memstats runtime.MemStats + runtime.ReadMemStats(&memstats) + c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys) + + f, err := coll.FileSystem(nil, nil) + c.Check(err, check.IsNil) + c.Logf("%s loaded", time.Now()) + + for i := 0; i < dirCount; i++ { + for j := 0; j < fileCount; j++ { + f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j)) + } + } + c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount) + + runtime.ReadMemStats(&memstats) + c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys) } // Gocheck boilerplate diff --git a/sdk/go/arvados/error.go b/sdk/go/arvados/error.go index 773a2e6f9c..9a04855784 100644 --- a/sdk/go/arvados/error.go +++ b/sdk/go/arvados/error.go @@ -17,7 +17,7 @@ type TransactionError struct { URL url.URL StatusCode int Status string - errors []string + Errors []string } func (e TransactionError) Error() (s string) { @@ -25,8 +25,8 @@ func (e TransactionError) Error() (s string) { if e.Status != "" { s = s + ": " + e.Status } - if len(e.errors) > 0 { - s = s + ": " + strings.Join(e.errors, "; ") + if len(e.Errors) > 0 { + s = s + ": " + strings.Join(e.Errors, "; ") } return } @@ -35,7 +35,7 @@ func newTransactionError(req *http.Request, resp *http.Response, buf []byte) *Tr var e TransactionError if json.Unmarshal(buf, &e) != nil { // No JSON-formatted error response - e.errors = nil + e.Errors = nil } e.Method = req.Method e.URL = *req.URL diff --git a/sdk/go/httpserver/id_generator.go b/sdk/go/httpserver/id_generator.go index 18fd91f560..d2c3a41f21 100644 --- a/sdk/go/httpserver/id_generator.go +++ b/sdk/go/httpserver/id_generator.go @@ -5,6 +5,8 @@ package httpserver import ( + "math/rand" + "net/http" "strconv" "sync" "time" @@ -17,19 +19,34 @@ type IDGenerator struct { // Prefix is prepended to each returned ID. Prefix string - lastID int64 - mtx sync.Mutex + mtx sync.Mutex + src rand.Source } // Next returns a new ID string. It is safe to call Next from multiple // goroutines. func (g *IDGenerator) Next() string { - id := time.Now().UnixNano() g.mtx.Lock() - if id <= g.lastID { - id = g.lastID + 1 + defer g.mtx.Unlock() + if g.src == nil { + g.src = rand.NewSource(time.Now().UnixNano()) } - g.lastID = id - g.mtx.Unlock() - return g.Prefix + strconv.FormatInt(id, 36) + a, b := g.src.Int63(), g.src.Int63() + id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36) + for len(id) > 20 { + id = id[:20] + } + return g.Prefix + id +} + +// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header +// to each request that doesn't already have one. +func AddRequestIDs(h http.Handler) http.Handler { + gen := &IDGenerator{Prefix: "req-"} + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.Header.Get("X-Request-Id") == "" { + req.Header.Set("X-Request-Id", gen.Next()) + } + h.ServeHTTP(w, req) + }) } diff --git a/sdk/go/httpserver/logger.go b/sdk/go/httpserver/logger.go new file mode 100644 index 0000000000..decb2ff28b --- /dev/null +++ b/sdk/go/httpserver/logger.go @@ -0,0 +1,82 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package httpserver + +import ( + "context" + "net/http" + "time" + + "git.curoverse.com/arvados.git/sdk/go/stats" + log "github.com/Sirupsen/logrus" +) + +type contextKey struct { + name string +} + +var requestTimeContextKey = contextKey{"requestTime"} + +// LogRequests wraps an http.Handler, logging each request and +// response via logrus. +func LogRequests(h http.Handler) http.Handler { + return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) { + w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)} + req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now())) + lgr := log.WithFields(log.Fields{ + "RequestID": req.Header.Get("X-Request-Id"), + "remoteAddr": req.RemoteAddr, + "reqForwardedFor": req.Header.Get("X-Forwarded-For"), + "reqMethod": req.Method, + "reqPath": req.URL.Path[1:], + "reqBytes": req.ContentLength, + }) + logRequest(w, req, lgr) + defer logResponse(w, req, lgr) + h.ServeHTTP(w, req) + }) +} + +func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) { + lgr.Info("request") +} + +func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) { + if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok { + tDone := time.Now() + lgr = lgr.WithFields(log.Fields{ + "timeTotal": stats.Duration(tDone.Sub(tStart)), + "timeToStatus": stats.Duration(w.writeTime.Sub(tStart)), + "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)), + }) + } + lgr.WithFields(log.Fields{ + "respStatusCode": w.WroteStatus(), + "respStatus": http.StatusText(w.WroteStatus()), + "respBytes": w.WroteBodyBytes(), + }).Info("response") +} + +type responseTimer struct { + ResponseWriter + wrote bool + writeTime time.Time +} + +func (rt *responseTimer) WriteHeader(code int) { + if !rt.wrote { + rt.wrote = true + rt.writeTime = time.Now() + } + rt.ResponseWriter.WriteHeader(code) +} + +func (rt *responseTimer) Write(p []byte) (int, error) { + if !rt.wrote { + rt.wrote = true + rt.writeTime = time.Now() + } + return rt.ResponseWriter.Write(p) +} diff --git a/sdk/go/httpserver/logger_test.go b/sdk/go/httpserver/logger_test.go new file mode 100644 index 0000000000..bbcafa1439 --- /dev/null +++ b/sdk/go/httpserver/logger_test.go @@ -0,0 +1,68 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package httpserver + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + log "github.com/Sirupsen/logrus" + check "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +var _ = check.Suite(&Suite{}) + +type Suite struct{} + +func (s *Suite) TestLogRequests(c *check.C) { + defer log.SetOutput(os.Stdout) + captured := &bytes.Buffer{} + log.SetOutput(captured) + log.SetFormatter(&log.JSONFormatter{ + TimestampFormat: time.RFC3339Nano, + }) + h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.Write([]byte("hello world")) + }) + req, err := http.NewRequest("GET", "https://foo.example/bar", nil) + req.Header.Set("X-Forwarded-For", "1.2.3.4:12345") + c.Assert(err, check.IsNil) + resp := httptest.NewRecorder() + AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req) + + dec := json.NewDecoder(captured) + + gotReq := make(map[string]interface{}) + err = dec.Decode(&gotReq) + c.Logf("%#v", gotReq) + c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}") + c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345") + c.Check(gotReq["msg"], check.Equals, "request") + + gotResp := make(map[string]interface{}) + err = dec.Decode(&gotResp) + c.Logf("%#v", gotResp) + c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"]) + c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345") + c.Check(gotResp["msg"], check.Equals, "response") + + c.Assert(gotResp["time"], check.FitsTypeOf, "") + _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string)) + c.Check(err, check.IsNil) + + for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} { + c.Assert(gotResp[key], check.FitsTypeOf, float64(0)) + c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0)) + } +} diff --git a/sdk/go/httpserver/responsewriter.go b/sdk/go/httpserver/responsewriter.go index e6ac4ca7a8..f17bc820a9 100644 --- a/sdk/go/httpserver/responsewriter.go +++ b/sdk/go/httpserver/responsewriter.go @@ -8,40 +8,46 @@ import ( "net/http" ) -// ResponseWriter wraps http.ResponseWriter and exposes the status +type ResponseWriter interface { + http.ResponseWriter + WroteStatus() int + WroteBodyBytes() int +} + +// responseWriter wraps http.ResponseWriter and exposes the status // sent, the number of bytes sent to the client, and the last write // error. -type ResponseWriter struct { +type responseWriter struct { http.ResponseWriter - wroteStatus *int // Last status given to WriteHeader() - wroteBodyBytes *int // Bytes successfully written - err *error // Last error returned from Write() + wroteStatus int // Last status given to WriteHeader() + wroteBodyBytes int // Bytes successfully written + err error // Last error returned from Write() } func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter { - return ResponseWriter{orig, new(int), new(int), new(error)} + return &responseWriter{ResponseWriter: orig} } -func (w ResponseWriter) WriteHeader(s int) { - *w.wroteStatus = s +func (w *responseWriter) WriteHeader(s int) { + w.wroteStatus = s w.ResponseWriter.WriteHeader(s) } -func (w ResponseWriter) Write(data []byte) (n int, err error) { +func (w *responseWriter) Write(data []byte) (n int, err error) { n, err = w.ResponseWriter.Write(data) - *w.wroteBodyBytes += n - *w.err = err + w.wroteBodyBytes += n + w.err = err return } -func (w ResponseWriter) WroteStatus() int { - return *w.wroteStatus +func (w *responseWriter) WroteStatus() int { + return w.wroteStatus } -func (w ResponseWriter) WroteBodyBytes() int { - return *w.wroteBodyBytes +func (w *responseWriter) WroteBodyBytes() int { + return w.wroteBodyBytes } -func (w ResponseWriter) Err() error { - return *w.err +func (w *responseWriter) Err() error { + return w.err } diff --git a/sdk/go/keepclient/block_cache.go b/sdk/go/keepclient/block_cache.go index e841a00fa1..bac4a24fd5 100644 --- a/sdk/go/keepclient/block_cache.go +++ b/sdk/go/keepclient/block_cache.go @@ -7,6 +7,8 @@ package keepclient import ( "io" "sort" + "strconv" + "strings" "sync" "time" ) @@ -49,10 +51,30 @@ func (c *BlockCache) Sweep() { } } +// ReadAt returns data from the cache, first retrieving it from Keep if +// necessary. +func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) { + buf, err := c.Get(kc, locator) + if err != nil { + return 0, err + } + if off > len(buf) { + return 0, io.ErrUnexpectedEOF + } + return copy(p, buf[off:]), nil +} + // Get returns data from the cache, first retrieving it from Keep if // necessary. func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { cacheKey := locator[:32] + bufsize := BLOCKSIZE + if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 { + datasize, err := strconv.ParseInt(parts[1], 10, 32) + if err == nil && datasize >= 0 { + bufsize = int(datasize) + } + } c.mtx.Lock() if c.cache == nil { c.cache = make(map[string]*cacheBlock) @@ -68,7 +90,7 @@ func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) { rdr, size, _, err := kc.Get(locator) var data []byte if err == nil { - data = make([]byte, size, BLOCKSIZE) + data = make([]byte, size, bufsize) _, err = io.ReadFull(rdr, data) err2 := rdr.Close() if err == nil { diff --git a/sdk/go/keepclient/collectionreader.go b/sdk/go/keepclient/collectionreader.go index 57829aadeb..fa309f6553 100644 --- a/sdk/go/keepclient/collectionreader.go +++ b/sdk/go/keepclient/collectionreader.go @@ -6,25 +6,12 @@ package keepclient import ( "errors" - "fmt" - "io" "os" "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/manifest" ) -const ( - // After reading a data block from Keep, cfReader slices it up - // and sends the slices to a buffered channel to be consumed - // by the caller via Read(). - // - // dataSliceSize is the maximum size of the slices, and - // therefore the maximum number of bytes that will be returned - // by a single call to Read(). - dataSliceSize = 1 << 20 -) - // ErrNoManifest indicates the given collection has no manifest // information (e.g., manifest_text was excluded by a "select" // parameter when retrieving the collection record). @@ -38,141 +25,17 @@ func (kc *KeepClient) CollectionFileReader(collection map[string]interface{}, fi if !ok { return nil, ErrNoManifest } - m := manifest.Manifest{Text: mText} - return kc.ManifestFileReader(m, filename) -} - -func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) { - f := &file{ - kc: kc, - } - err := f.load(m, filename) + fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc) if err != nil { return nil, err } - return f, nil -} - -type file struct { - kc *KeepClient - segments []*manifest.FileSegment - size int64 // total file size - offset int64 // current read offset - - // current/latest segment accessed -- might or might not match pos - seg *manifest.FileSegment - segStart int64 // position of segment relative to file - segData []byte - segNext []*manifest.FileSegment - readaheadDone bool -} - -// Close implements io.Closer. -func (f *file) Close() error { - f.kc = nil - f.segments = nil - f.segData = nil - return nil + return fs.OpenFile(filename, os.O_RDONLY, 0) } -// Read implements io.Reader. -func (f *file) Read(buf []byte) (int, error) { - if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) { - // f.seg does not cover the current read offset - // (f.pos). Iterate over f.segments to find the one - // that does. - f.seg = nil - f.segStart = 0 - f.segData = nil - f.segNext = f.segments - for len(f.segNext) > 0 { - seg := f.segNext[0] - f.segNext = f.segNext[1:] - segEnd := f.segStart + int64(seg.Len) - if segEnd > f.offset { - f.seg = seg - break - } - f.segStart = segEnd - } - f.readaheadDone = false - } - if f.seg == nil { - return 0, io.EOF - } - if f.segData == nil { - data, err := f.kc.cache().Get(f.kc, f.seg.Locator) - if err != nil { - return 0, err - } - if len(data) < f.seg.Offset+f.seg.Len { - return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator) - } - f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len] - } - // dataOff and dataLen denote a portion of f.segData - // corresponding to a portion of the file at f.offset. - dataOff := int(f.offset - f.segStart) - dataLen := f.seg.Len - dataOff - - if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 { - // If we have already read more than just the first - // few bytes of this file, and we have already - // consumed a noticeable portion of this segment, and - // there's more data for this file in the next segment - // ... then there's a good chance we are going to need - // the data for that next segment soon. Start getting - // it into the cache now. - go f.kc.cache().Get(f.kc, f.segNext[0].Locator) - f.readaheadDone = true - } - - n := len(buf) - if n > dataLen { - n = dataLen - } - copy(buf[:n], f.segData[dataOff:dataOff+n]) - f.offset += int64(n) - return n, nil -} - -// Seek implements io.Seeker. -func (f *file) Seek(offset int64, whence int) (int64, error) { - var want int64 - switch whence { - case io.SeekStart: - want = offset - case io.SeekCurrent: - want = f.offset + offset - case io.SeekEnd: - want = f.size + offset - default: - return f.offset, fmt.Errorf("invalid whence %d", whence) - } - if want < 0 { - return f.offset, fmt.Errorf("attempted seek to %d", want) - } - if want > f.size { - want = f.size - } - f.offset = want - return f.offset, nil -} - -// Size returns the file size in bytes. -func (f *file) Size() int64 { - return f.size -} - -func (f *file) load(m manifest.Manifest, path string) error { - f.segments = nil - f.size = 0 - for seg := range m.FileSegmentIterByName(path) { - f.segments = append(f.segments, seg) - f.size += int64(seg.Len) - } - if f.segments == nil { - return os.ErrNotExist +func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) { + fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc) + if err != nil { + return nil, err } - return nil + return fs.OpenFile(filename, os.O_RDONLY, 0) } diff --git a/sdk/go/keepclient/collectionreader_test.go b/sdk/go/keepclient/collectionreader_test.go index df8bcb39dc..5d1e2a1533 100644 --- a/sdk/go/keepclient/collectionreader_test.go +++ b/sdk/go/keepclient/collectionreader_test.go @@ -166,11 +166,11 @@ func (s *CollectionReaderUnit) TestCollectionReaderContent(c *check.C) { func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) { h := md5.New() - var testdata []byte buf := make([]byte, 4096) locs := make([]string, len(buf)) + testdata := make([]byte, 0, len(buf)*len(buf)) filesize := 0 - for i := 0; i < len(locs); i++ { + for i := range locs { _, err := rand.Read(buf[:i]) h.Write(buf[:i]) locs[i], _, err = s.kc.PutB(buf[:i]) @@ -219,11 +219,12 @@ func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) { c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata)) c.Check(buf[:1000], check.DeepEquals, testdata[:1000]) + expectPos := curPos + size + 12345 curPos, err = rdr.Seek(size+12345, io.SeekCurrent) c.Check(err, check.IsNil) - c.Check(curPos, check.Equals, size) + c.Check(curPos, check.Equals, expectPos) - curPos, err = rdr.Seek(8-size, io.SeekCurrent) + curPos, err = rdr.Seek(8-curPos, io.SeekCurrent) c.Check(err, check.IsNil) c.Check(curPos, check.Equals, int64(8)) diff --git a/sdk/go/keepclient/keepclient.go b/sdk/go/keepclient/keepclient.go index 37d651e31f..54a4a374b9 100644 --- a/sdk/go/keepclient/keepclient.go +++ b/sdk/go/keepclient/keepclient.go @@ -293,6 +293,12 @@ func (kc *KeepClient) Get(locator string) (io.ReadCloser, int64, string, error) return kc.getOrHead("GET", locator) } +// ReadAt() retrieves a portion of block from the cache if it's +// present, otherwise from the network. +func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) { + return kc.cache().ReadAt(kc, locator, p, off) +} + // Ask() verifies that a block with the given hash is available and // readable, according to at least one Keep service. Unlike Get, it // does not retrieve the data or verify that the data content matches diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb index 83765fb1dc..466adc0eef 100644 --- a/services/api/app/models/container.rb +++ b/services/api/app/models/container.rb @@ -31,6 +31,7 @@ class Container < ArvadosModel after_validation :assign_auth before_save :sort_serialized_attrs after_save :handle_completed + after_save :propagate_priority has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid @@ -93,6 +94,20 @@ class Container < ArvadosModel end end + def propagate_priority + if self.priority_changed? + act_as_system_user do + # Update the priority of child container requests to match new priority + # of the parent container. + ContainerRequest.where(requesting_container_uuid: self.uuid, + state: ContainerRequest::Committed).each do |cr| + cr.priority = self.priority + cr.save + end + end + end + end + # Create a new container (or find an existing one) to satisfy the # given container request. def self.resolve(req) diff --git a/services/api/test/unit/container_request_test.rb b/services/api/test/unit/container_request_test.rb index e751d6158c..cecf7b818e 100644 --- a/services/api/test/unit/container_request_test.rb +++ b/services/api/test/unit/container_request_test.rb @@ -293,6 +293,35 @@ class ContainerRequestTest < ActiveSupport::TestCase assert_equal 0, c2.priority end + + test "Container makes container request, then changes priority" do + set_user_from_auth :active + cr = create_minimal_req!(priority: 5, state: "Committed", container_count_max: 1) + + c = Container.find_by_uuid cr.container_uuid + assert_equal 5, c.priority + + cr2 = create_minimal_req! + cr2.update_attributes!(priority: 5, state: "Committed", requesting_container_uuid: c.uuid, command: ["echo", "foo2"], container_count_max: 1) + cr2.reload + + c2 = Container.find_by_uuid cr2.container_uuid + assert_equal 5, c2.priority + + act_as_system_user do + c.priority = 10 + c.save! + end + + cr.reload + + cr2.reload + assert_equal 10, cr2.priority + + c2.reload + assert_equal 10, c2.priority + end + [ ['running_container_auth', 'zzzzz-dz642-runningcontainr'], ['active_no_prefs', nil], diff --git a/services/arv-git-httpd/auth_handler.go b/services/arv-git-httpd/auth_handler.go index b7373b5c1e..617c73282f 100644 --- a/services/arv-git-httpd/auth_handler.go +++ b/services/arv-git-httpd/auth_handler.go @@ -188,5 +188,5 @@ func (h *authHandler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { } r.URL.Path = rewrittenPath - h.handler.ServeHTTP(&w, r) + h.handler.ServeHTTP(w, r) } diff --git a/services/arv-git-httpd/main.go b/services/arv-git-httpd/main.go index 79a3eb3f7b..74ac7ae55e 100644 --- a/services/arv-git-httpd/main.go +++ b/services/arv-git-httpd/main.go @@ -7,6 +7,7 @@ package main import ( "encoding/json" "flag" + "fmt" "log" "os" "regexp" @@ -16,6 +17,8 @@ import ( "github.com/coreos/go-systemd/daemon" ) +var version = "dev" + // Server configuration type Config struct { Client arvados.Client @@ -50,6 +53,7 @@ func main() { cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.") dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)") + getVersion := flag.Bool("version", false, "print version information and exit.") flag.StringVar(&theConfig.ManagementToken, "management-token", theConfig.ManagementToken, "Authorization token to be included in all health check requests.") @@ -57,6 +61,12 @@ func main() { flag.Usage = usage flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("arv-git-httpd %s\n", version) + return + } + err := config.LoadFile(theConfig, *cfgPath) if err != nil { h := os.Getenv("ARVADOS_API_HOST") @@ -84,6 +94,7 @@ func main() { if _, err := daemon.SdNotify(false, "READY=1"); err != nil { log.Printf("Error notifying init daemon: %v", err) } + log.Printf("arv-git-httpd %s started", version) log.Println("Listening at", srv.Addr) log.Println("Repository root", theConfig.RepoRoot) if err := srv.Wait(); err != nil { diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go index 888a2148c1..279327ba18 100644 --- a/services/crunch-dispatch-local/crunch-dispatch-local.go +++ b/services/crunch-dispatch-local/crunch-dispatch-local.go @@ -9,6 +9,7 @@ package main import ( "context" "flag" + "fmt" "log" "os" "os/exec" @@ -22,6 +23,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/dispatch" ) +var version = "dev" + func main() { err := doMain() if err != nil { @@ -49,9 +52,22 @@ func doMain() error { "/usr/bin/crunch-run", "Crunch command to run container") + getVersion := flags.Bool( + "version", + false, + "Print version information and exit.") + // Parse args; omit the first arg which is the command name flags.Parse(os.Args[1:]) + // Print version information if requested + if *getVersion { + fmt.Printf("crunch-dispatch-local %s\n", version) + return nil + } + + log.Printf("crunch-dispatch-local %s started", version) + runningCmds = make(map[string]*exec.Cmd) arv, err := arvadosclient.MakeArvadosClient() diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 30cbb79dc1..d322b0f3f6 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -26,6 +26,8 @@ import ( "github.com/coreos/go-systemd/daemon" ) +var version = "dev" + // Config used by crunch-dispatch-slurm type Config struct { Client arvados.Client @@ -69,10 +71,21 @@ func doMain() error { "dump-config", false, "write current configuration to stdout and exit") - + getVersion := flags.Bool( + "version", + false, + "Print version information and exit.") // Parse args; omit the first arg which is the command name flags.Parse(os.Args[1:]) + // Print version information if requested + if *getVersion { + fmt.Printf("crunch-dispatch-slurm %s\n", version) + return nil + } + + log.Printf("crunch-dispatch-slurm %s started", version) + err := readConfig(&theConfig, *configPath) if err != nil { return err diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index 27a548aa61..f3f754b59d 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -19,6 +19,7 @@ import ( "os/signal" "path" "path/filepath" + "regexp" "runtime" "runtime/pprof" "sort" @@ -39,6 +40,8 @@ import ( dockerclient "github.com/docker/docker/client" ) +var version = "dev" + // IArvadosClient is the minimal Arvados API methods used by crunch-run. type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error @@ -228,6 +231,35 @@ func (runner *ContainerRunner) stopSignals() { } } +var errorBlacklist = []string{ + "(?ms).*[Cc]annot connect to the Docker daemon.*", + "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*", +} +var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") + +func (runner *ContainerRunner) checkBrokenNode(goterr error) bool { + for _, d := range errorBlacklist { + if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil { + runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr) + if *brokenNodeHook == "" { + runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.") + } else { + runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) + // run killme script + c := exec.Command(*brokenNodeHook) + c.Stdout = runner.CrunchLog + c.Stderr = runner.CrunchLog + err := c.Run() + if err != nil { + runner.CrunchLog.Printf("Error running broken node hook: %v", err) + } + } + return true + } + } + return false +} + // LoadImage determines the docker image id from the container record and // checks if it is available in the local Docker image store. If not, it loads // the image from Keep. @@ -616,7 +648,7 @@ type infoCommand struct { cmd []string } -// Gather node information and store it on the log for debugging +// LogNodeInfo gathers node information and store it on the log for debugging // purposes. func (runner *ContainerRunner) LogNodeInfo() (err error) { w := runner.NewLogWriter("node-info") @@ -666,7 +698,7 @@ func (runner *ContainerRunner) LogNodeInfo() (err error) { return nil } -// Get and save the raw JSON container record from the API server +// LogContainerRecord gets and saves the raw JSON container record from the API server func (runner *ContainerRunner) LogContainerRecord() (err error) { w := &ArvLogWriter{ ArvClient: runner.ArvClient, @@ -889,7 +921,7 @@ func (runner *ContainerRunner) StartContainer() error { dockertypes.ContainerStartOptions{}) if err != nil { var advice string - if strings.Contains(err.Error(), "no such file or directory") { + if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil { advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0]) } return fmt.Errorf("could not start container: %v%s", err, advice) @@ -1418,6 +1450,7 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser { // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { + runner.CrunchLog.Printf("crunch-run %s started", version) runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) hostname, hosterr := os.Hostname() @@ -1487,7 +1520,11 @@ func (runner *ContainerRunner) Run() (err error) { // check for and/or load image err = runner.LoadImage() if err != nil { - runner.finalState = "Cancelled" + if !runner.checkBrokenNode(err) { + // Failed to load image but not due to a "broken node" + // condition, probably user error. + runner.finalState = "Cancelled" + } err = fmt.Errorf("While loading container image: %v", err) return } @@ -1516,8 +1553,6 @@ func (runner *ContainerRunner) Run() (err error) { return } - runner.StartCrunchstat() - if runner.IsCancelled() { return } @@ -1528,8 +1563,11 @@ func (runner *ContainerRunner) Run() (err error) { } runner.finalState = "Cancelled" + runner.StartCrunchstat() + err = runner.StartContainer() if err != nil { + runner.checkBrokenNode(err) return } @@ -1593,8 +1631,17 @@ func main() { `Set networking mode for container. Corresponds to Docker network mode (--net). `) memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container") + getVersion := flag.Bool("version", false, "Print version information and exit.") flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("crunch-run %s\n", version) + return + } + + log.Printf("crunch-run %s started", version) + containerId := flag.Arg(0) if *caCertsPath != "" { @@ -1607,25 +1654,27 @@ func main() { } api.Retries = 8 - var kc *keepclient.KeepClient - kc, err = keepclient.MakeKeepClient(api) - if err != nil { - log.Fatalf("%s: %v", containerId, err) + kc, kcerr := keepclient.MakeKeepClient(api) + if kcerr != nil { + log.Fatalf("%s: %v", containerId, kcerr) } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} kc.Retries = 4 - var docker *dockerclient.Client // API version 1.21 corresponds to Docker 1.9, which is currently the // minimum version we want to support. - docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - if err != nil { - log.Fatalf("%s: %v", containerId, err) - } - + docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) dockerClientProxy := ThinDockerClientProxy{Docker: docker} cr := NewContainerRunner(api, kc, dockerClientProxy, containerId) + + if dockererr != nil { + cr.CrunchLog.Printf("%s: %v", containerId, dockererr) + cr.checkBrokenNode(dockererr) + cr.CrunchLog.Close() + os.Exit(1) + } + cr.statInterval = *statInterval cr.cgroupRoot = *cgroupRoot cr.expectCgroupParent = *cgroupParent diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go index bc0b3125c9..e1d9fed730 100644 --- a/services/crunch-run/crunchrun_test.go +++ b/services/crunch-run/crunchrun_test.go @@ -130,6 +130,19 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco } func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error { + if t.finish == 3 { + return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`) + } + if t.finish == 4 { + return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`) + } + if t.finish == 5 { + return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`) + } + if t.finish == 6 { + return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`) + } + if container == "abcde" { // t.fn gets executed in ContainerWait return nil @@ -156,6 +169,10 @@ func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, } func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) { + if t.finish == 2 { + return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") + } + if t.imageLoaded == image { return dockertypes.ImageInspect{}, nil, nil } else { @@ -164,6 +181,9 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string } func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) { + if t.finish == 2 { + return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") + } _, err := io.Copy(ioutil.Discard, input) if err != nil { return dockertypes.ImageLoadResponse{}, err @@ -321,11 +341,27 @@ type FileWrapper struct { len int64 } +func (fw FileWrapper) Readdir(n int) ([]os.FileInfo, error) { + return nil, errors.New("not implemented") +} + +func (fw FileWrapper) Seek(int64, int) (int64, error) { + return 0, errors.New("not implemented") +} + func (fw FileWrapper) Size() int64 { return fw.len } -func (fw FileWrapper) Seek(int64, int) (int64, error) { +func (fw FileWrapper) Stat() (os.FileInfo, error) { + return nil, errors.New("not implemented") +} + +func (fw FileWrapper) Truncate(int64) error { + return errors.New("not implemented") +} + +func (fw FileWrapper) Write([]byte) (int, error) { return 0, errors.New("not implemented") } @@ -433,20 +469,14 @@ func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, erro func (KeepReadErrorTestClient) ClearBlockCache() { } -type ErrorReader struct{} +type ErrorReader struct { + FileWrapper +} func (ErrorReader) Read(p []byte) (n int, err error) { return 0, errors.New("ErrorReader") } -func (ErrorReader) Close() error { - return nil -} - -func (ErrorReader) Size() int64 { - return 0 -} - func (ErrorReader) Seek(int64, int) (int64, error) { return 0, errors.New("ErrorReader") } @@ -668,9 +698,10 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f if api.CalledWith("container.state", "Complete") != nil { c.Check(err, IsNil) } - c.Check(api.WasSetRunning, Equals, true) - - c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil) + if exitCode != 2 { + c.Check(api.WasSetRunning, Equals, true) + c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil) + } if err != nil { for k, v := range api.Logs { @@ -1759,3 +1790,149 @@ func (s *TestSuite) TestEvalSymlinkDir(c *C) { _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0) c.Assert(err, NotNil) } + +func (s *TestSuite) TestFullBrokenDocker1(c *C) { + tf, err := ioutil.TempFile("", "brokenNodeHook-") + c.Assert(err, IsNil) + defer os.Remove(tf.Name()) + + tf.Write([]byte(`#!/bin/sh +exec echo killme +`)) + tf.Close() + os.Chmod(tf.Name(), 0700) + + ech := tf.Name() + brokenNodeHook = &ech + + api, _, _ := FullRunHelper(c, `{ + "command": ["echo", "hello world"], + "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {} +}`, nil, 2, func(t *TestDockerClient) { + t.logWriter.Write(dockerLog(1, "hello world\n")) + t.logWriter.Close() + }) + + c.Check(api.CalledWith("container.state", "Queued"), NotNil) + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*") + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*") + +} + +func (s *TestSuite) TestFullBrokenDocker2(c *C) { + ech := "" + brokenNodeHook = &ech + + api, _, _ := FullRunHelper(c, `{ + "command": ["echo", "hello world"], + "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {} +}`, nil, 2, func(t *TestDockerClient) { + t.logWriter.Write(dockerLog(1, "hello world\n")) + t.logWriter.Close() + }) + + c.Check(api.CalledWith("container.state", "Queued"), NotNil) + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*") +} + +func (s *TestSuite) TestFullBrokenDocker3(c *C) { + ech := "" + brokenNodeHook = &ech + + api, _, _ := FullRunHelper(c, `{ + "command": ["echo", "hello world"], + "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {} +}`, nil, 3, func(t *TestDockerClient) { + t.logWriter.Write(dockerLog(1, "hello world\n")) + t.logWriter.Close() + }) + + c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") +} + +func (s *TestSuite) TestBadCommand1(c *C) { + ech := "" + brokenNodeHook = &ech + + api, _, _ := FullRunHelper(c, `{ + "command": ["echo", "hello world"], + "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {} +}`, nil, 4, func(t *TestDockerClient) { + t.logWriter.Write(dockerLog(1, "hello world\n")) + t.logWriter.Close() + }) + + c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") +} + +func (s *TestSuite) TestBadCommand2(c *C) { + ech := "" + brokenNodeHook = &ech + + api, _, _ := FullRunHelper(c, `{ + "command": ["echo", "hello world"], + "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {} +}`, nil, 5, func(t *TestDockerClient) { + t.logWriter.Write(dockerLog(1, "hello world\n")) + t.logWriter.Close() + }) + + c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") +} + +func (s *TestSuite) TestBadCommand3(c *C) { + ech := "" + brokenNodeHook = &ech + + api, _, _ := FullRunHelper(c, `{ + "command": ["echo", "hello world"], + "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {} +}`, nil, 6, func(t *TestDockerClient) { + t.logWriter.Write(dockerLog(1, "hello world\n")) + t.logWriter.Close() + }) + + c.Check(api.CalledWith("container.state", "Cancelled"), NotNil) + c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") +} diff --git a/services/crunchstat/crunchstat.go b/services/crunchstat/crunchstat.go index cd84770e54..ad433bb3b5 100644 --- a/services/crunchstat/crunchstat.go +++ b/services/crunchstat/crunchstat.go @@ -7,6 +7,7 @@ package main import ( "bufio" "flag" + "fmt" "io" "log" "os" @@ -23,6 +24,7 @@ const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split var ( signalOnDeadPPID int = 15 ppidCheckInterval = time.Second + version = "dev" ) func main() { @@ -36,9 +38,18 @@ func main() { flag.IntVar(&signalOnDeadPPID, "signal-on-dead-ppid", signalOnDeadPPID, "Signal to send child if crunchstat's parent process disappears (0 to disable)") flag.DurationVar(&ppidCheckInterval, "ppid-check-interval", ppidCheckInterval, "Time between checks for parent process disappearance") pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds") + getVersion := flag.Bool("version", false, "Print version information and exit.") flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("crunchstat %s\n", version) + return + } + + reporter.Logger.Printf("crunchstat %s started", version) + if reporter.CgroupRoot == "" { reporter.Logger.Fatal("error: must provide -cgroup-root") } else if signalOnDeadPPID < 0 { diff --git a/services/fuse/arvados_fuse/command.py b/services/fuse/arvados_fuse/command.py index 4dad90c867..f174d1bb02 100644 --- a/services/fuse/arvados_fuse/command.py +++ b/services/fuse/arvados_fuse/command.py @@ -202,6 +202,7 @@ class Mount(object): logging.getLogger('arvados.collection').setLevel(logging.DEBUG) self.logger.debug("arv-mount debugging enabled") + self.logger.info("%s %s started", sys.argv[0], __version__) self.logger.info("enable write is %s", self.args.enable_write) def _setup_api(self): diff --git a/services/health/main.go b/services/health/main.go index b6358deefc..496fb884d4 100644 --- a/services/health/main.go +++ b/services/health/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "fmt" "net/http" "git.curoverse.com/arvados.git/sdk/go/arvados" @@ -10,13 +11,24 @@ import ( log "github.com/Sirupsen/logrus" ) +var version = "dev" + func main() { configFile := flag.String("config", arvados.DefaultConfigFile, "`path` to arvados configuration file") + getVersion := flag.Bool("version", false, "Print version information and exit.") flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("arvados-health %s\n", version) + return + } + log.SetFormatter(&log.JSONFormatter{ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00", }) + log.Printf("arvados-health %s started", version) + cfg, err := arvados.GetConfig(*configFile) if err != nil { log.Fatal(err) diff --git a/services/keep-balance/main.go b/services/keep-balance/main.go index 8a938ccf53..947033564d 100644 --- a/services/keep-balance/main.go +++ b/services/keep-balance/main.go @@ -7,6 +7,7 @@ package main import ( "encoding/json" "flag" + "fmt" "log" "os" "os/signal" @@ -17,6 +18,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/config" ) +var version = "dev" + const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml" // Config specifies site configuration, like API credentials and the @@ -85,9 +88,16 @@ func main() { dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit") dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout") debugFlag := flag.Bool("debug", false, "enable debug messages") + getVersion := flag.Bool("version", false, "Print version information and exit.") flag.Usage = usage flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("keep-balance %s\n", version) + return + } + mustReadConfig(&cfg, *configPath) if *serviceListPath != "" { mustReadConfig(&cfg.KeepServiceList, *serviceListPath) @@ -97,6 +107,8 @@ func main() { log.Fatal(config.DumpAndExit(cfg)) } + log.Printf("keep-balance %s started", version) + if *debugFlag { debugf = log.Printf if j, err := json.Marshal(cfg); err != nil { diff --git a/services/keep-web/cache.go b/services/keep-web/cache.go index ce1168acd2..9ee99903c8 100644 --- a/services/keep-web/cache.go +++ b/services/keep-web/cache.go @@ -86,6 +86,29 @@ func (c *cache) Stats() cacheStats { } } +// Update saves a modified version (fs) to an existing collection +// (coll) and, if successful, updates the relevant cache entries so +// subsequent calls to Get() reflect the modifications. +func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error { + c.setupOnce.Do(c.setup) + + if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText { + return err + } else { + coll.ManifestText = m + } + var updated arvados.Collection + defer c.pdhs.Remove(coll.UUID) + err := client.RequestAndDecode(&updated, "PATCH", "/arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil) + if err == nil { + c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{ + expire: time.Now().Add(time.Duration(c.TTL)), + collection: &updated, + }) + } + return err +} + func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) { c.setupOnce.Do(c.setup) diff --git a/services/keep-web/cadaver_test.go b/services/keep-web/cadaver_test.go index 87a712f04c..d4a89c844b 100644 --- a/services/keep-web/cadaver_test.go +++ b/services/keep-web/cadaver_test.go @@ -7,42 +7,202 @@ package main import ( "bytes" "io" + "io/ioutil" + "net/url" + "os" "os/exec" + "strings" + "time" + "git.curoverse.com/arvados.git/sdk/go/arvados" "git.curoverse.com/arvados.git/sdk/go/arvadostest" check "gopkg.in/check.v1" ) func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) { - basePath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/" + testdata := []byte("the human tragedy consists in the necessity of living with the consequences of actions performed under the pressure of compulsions we do not understand") + + localfile, err := ioutil.TempFile("", "localfile") + c.Assert(err, check.IsNil) + defer os.Remove(localfile.Name()) + localfile.Write(testdata) + + emptyfile, err := ioutil.TempFile("", "emptyfile") + c.Assert(err, check.IsNil) + defer os.Remove(emptyfile.Name()) + + checkfile, err := ioutil.TempFile("", "checkfile") + c.Assert(err, check.IsNil) + defer os.Remove(checkfile.Name()) + + var newCollection arvados.Collection + arv := arvados.NewClientFromEnv() + arv.AuthToken = arvadostest.ActiveToken + err = arv.RequestAndDecode(&newCollection, "POST", "/arvados/v1/collections", bytes.NewBufferString(url.Values{"collection": {"{}"}}.Encode()), nil) + c.Assert(err, check.IsNil) + writePath := "/c=" + newCollection.UUID + "/t=" + arv.AuthToken + "/" + + pdhPath := "/c=" + strings.Replace(arvadostest.FooAndBarFilesInDirPDH, "+", "-", -1) + "/t=" + arv.AuthToken + "/" + + matchToday := time.Now().Format("Jan +2") + + readPath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/" type testcase struct { path string cmd string match string + data []byte } for _, trial := range []testcase{ { - path: basePath, + path: readPath, cmd: "ls\n", match: `(?ms).*dir1 *0 .*`, }, { - path: basePath, + path: readPath, cmd: "ls dir1\n", match: `(?ms).*bar *3.*foo *3 .*`, }, { - path: basePath + "_/dir1", + path: readPath + "_/dir1", cmd: "ls\n", match: `(?ms).*bar *3.*foo *3 .*`, }, { - path: basePath + "dir1/", + path: readPath + "dir1/", cmd: "ls\n", - match: `(?ms).*bar *3.*foo *3 .*`, + match: `(?ms).*bar *3.*foo +3 +Feb +\d+ +2014.*`, + }, + { + path: writePath, + cmd: "get emptyfile '" + checkfile.Name() + "'\n", + match: `(?ms).*Not Found.*`, + }, + { + path: writePath, + cmd: "put '" + emptyfile.Name() + "' emptyfile\n", + match: `(?ms).*Uploading .* succeeded.*`, + }, + { + path: writePath, + cmd: "get emptyfile '" + checkfile.Name() + "'\n", + match: `(?ms).*Downloading .* succeeded.*`, + data: []byte{}, + }, + { + path: writePath, + cmd: "put '" + localfile.Name() + "' testfile\n", + match: `(?ms).*Uploading .* succeeded.*`, + }, + { + path: writePath, + cmd: "get testfile '" + checkfile.Name() + "'\n", + match: `(?ms).*succeeded.*`, + data: testdata, + }, + { + path: writePath, + cmd: "move testfile newdir0/\n", + match: `(?ms).*Moving .* succeeded.*`, + }, + { + path: writePath, + cmd: "move testfile newdir0/\n", + match: `(?ms).*Moving .* failed.*`, + }, + { + path: writePath, + cmd: "ls\n", + match: `(?ms).*newdir0.* 0 +` + matchToday + ` \d+:\d+\n.*`, + }, + { + path: writePath, + cmd: "move newdir0/testfile emptyfile/bogus/\n", + match: `(?ms).*Moving .* failed.*`, + }, + { + path: writePath, + cmd: "mkcol newdir1\n", + match: `(?ms).*Creating .* succeeded.*`, + }, + { + path: writePath, + cmd: "move newdir0/testfile newdir1/\n", + match: `(?ms).*Moving .* succeeded.*`, + }, + { + path: writePath, + cmd: "put '" + localfile.Name() + "' newdir1/testfile1\n", + match: `(?ms).*Uploading .* succeeded.*`, + }, + { + path: writePath, + cmd: "mkcol newdir2\n", + match: `(?ms).*Creating .* succeeded.*`, + }, + { + path: writePath, + cmd: "put '" + localfile.Name() + "' newdir2/testfile2\n", + match: `(?ms).*Uploading .* succeeded.*`, + }, + { + path: writePath, + cmd: "copy newdir2/testfile2 testfile3\n", + match: `(?ms).*succeeded.*`, + }, + { + path: writePath, + cmd: "get testfile3 '" + checkfile.Name() + "'\n", + match: `(?ms).*succeeded.*`, + data: testdata, + }, + { + path: writePath, + cmd: "get newdir2/testfile2 '" + checkfile.Name() + "'\n", + match: `(?ms).*succeeded.*`, + data: testdata, + }, + { + path: writePath, + cmd: "rmcol newdir2\n", + match: `(?ms).*Deleting collection .* succeeded.*`, + }, + { + path: writePath, + cmd: "get newdir2/testfile2 '" + checkfile.Name() + "'\n", + match: `(?ms).*Downloading .* failed.*`, + }, + { + path: "/c=" + arvadostest.UserAgreementCollection + "/t=" + arv.AuthToken + "/", + cmd: "put '" + localfile.Name() + "' foo\n", + match: `(?ms).*Uploading .* failed:.*403 Forbidden.*`, + }, + { + path: pdhPath, + cmd: "put '" + localfile.Name() + "' foo\n", + match: `(?ms).*Uploading .* failed:.*405 Method Not Allowed.*`, + }, + { + path: pdhPath, + cmd: "move foo bar\n", + match: `(?ms).*Moving .* failed:.*405 Method Not Allowed.*`, + }, + { + path: pdhPath, + cmd: "copy foo bar\n", + match: `(?ms).*Copying .* failed:.*405 Method Not Allowed.*`, + }, + { + path: pdhPath, + cmd: "delete foo\n", + match: `(?ms).*Deleting .* failed:.*405 Method Not Allowed.*`, }, } { - c.Logf("%s %#v", "http://"+s.testServer.Addr, trial) + c.Logf("%s %+v", "http://"+s.testServer.Addr, trial) + + os.Remove(checkfile.Name()) + cmd := exec.Command("cadaver", "http://"+s.testServer.Addr+trial.path) cmd.Stdin = bytes.NewBufferString(trial.cmd) stdout, err := cmd.StdoutPipe() @@ -56,5 +216,15 @@ func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) { err = cmd.Wait() c.Check(err, check.Equals, nil) c.Check(buf.String(), check.Matches, trial.match) + + if trial.data == nil { + continue + } + checkfile, err = os.Open(checkfile.Name()) + c.Assert(err, check.IsNil) + checkfile.Seek(0, os.SEEK_SET) + got, err := ioutil.ReadAll(checkfile) + c.Check(got, check.DeepEquals, trial.data) + c.Check(err, check.IsNil) } } diff --git a/services/keep-web/doc.go b/services/keep-web/doc.go index 598fabcd37..b7da3b0e5a 100644 --- a/services/keep-web/doc.go +++ b/services/keep-web/doc.go @@ -2,11 +2,11 @@ // // SPDX-License-Identifier: AGPL-3.0 -// Keep-web provides read-only HTTP access to files stored in Keep. It -// serves public data to anonymous and unauthenticated clients, and -// serves private data to clients that supply Arvados API tokens. It -// can be installed anywhere with access to Keep services, typically -// behind a web proxy that supports TLS. +// Keep-web provides read/write HTTP (WebDAV) access to files stored +// in Keep. It serves public data to anonymous and unauthenticated +// clients, and serves private data to clients that supply Arvados API +// tokens. It can be installed anywhere with access to Keep services, +// typically behind a web proxy that supports TLS. // // See http://doc.arvados.org/install/install-keep-web.html. // @@ -40,7 +40,7 @@ // // Proxy configuration // -// Keep-web does not support SSL natively. Typically, it is installed +// Keep-web does not support TLS natively. Typically, it is installed // behind a proxy like nginx. // // Here is an example nginx configuration. diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go index fd36218bc1..a1476d3a8e 100644 --- a/services/keep-web/handler.go +++ b/services/keep-web/handler.go @@ -10,6 +10,7 @@ import ( "html" "html/template" "io" + "log" "net/http" "net/url" "os" @@ -90,16 +91,72 @@ func (h *handler) setup() { func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) { status := struct { cacheStats + Version string }{ cacheStats: h.Config.Cache.Stats(), + Version: version, } json.NewEncoder(w).Encode(status) } +// updateOnSuccess wraps httpserver.ResponseWriter. If the handler +// sends an HTTP header indicating success, updateOnSuccess first +// calls the provided update func. If the update func fails, a 500 +// response is sent, and the status code and body sent by the handler +// are ignored (all response writes return the update error). +type updateOnSuccess struct { + httpserver.ResponseWriter + update func() error + sentHeader bool + err error +} + +func (uos *updateOnSuccess) Write(p []byte) (int, error) { + if uos.err != nil { + return 0, uos.err + } + if !uos.sentHeader { + uos.WriteHeader(http.StatusOK) + } + return uos.ResponseWriter.Write(p) +} + +func (uos *updateOnSuccess) WriteHeader(code int) { + if !uos.sentHeader { + uos.sentHeader = true + if code >= 200 && code < 400 { + if uos.err = uos.update(); uos.err != nil { + code := http.StatusInternalServerError + if err, ok := uos.err.(*arvados.TransactionError); ok { + code = err.StatusCode + } + log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err) + http.Error(uos.ResponseWriter, uos.err.Error(), code) + return + } + } + } + uos.ResponseWriter.WriteHeader(code) +} + var ( + writeMethod = map[string]bool{ + "COPY": true, + "DELETE": true, + "MKCOL": true, + "MOVE": true, + "PUT": true, + "RMCOL": true, + } webdavMethod = map[string]bool{ + "COPY": true, + "DELETE": true, + "MKCOL": true, + "MOVE": true, "OPTIONS": true, "PROPFIND": true, + "PUT": true, + "RMCOL": true, } browserMethod = map[string]bool{ "GET": true, @@ -147,7 +204,7 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { return } w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Range") - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PROPFIND") + w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL") w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Max-Age", "86400") statusCode = http.StatusOK @@ -352,21 +409,45 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) { } applyContentDispositionHdr(w, r, basename, attachment) - fs := collection.FileSystem(&arvados.Client{ + client := &arvados.Client{ APIHost: arv.ApiServer, AuthToken: arv.ApiToken, Insecure: arv.ApiInsecure, - }, kc) + } + fs, err := collection.FileSystem(client, kc) + if err != nil { + statusCode, statusText = http.StatusInternalServerError, err.Error() + return + } + + targetIsPDH := arvadosclient.PDHMatch(targetID) + if targetIsPDH && writeMethod[r.Method] { + statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error() + return + } + if webdavMethod[r.Method] { + if writeMethod[r.Method] { + // Save the collection only if/when all + // webdav->filesystem operations succeed -- + // and send a 500 error if the modified + // collection can't be saved. + w = &updateOnSuccess{ + ResponseWriter: w, + update: func() error { + return h.Config.Cache.Update(client, *collection, fs) + }} + } h := webdav.Handler{ - Prefix: "/" + strings.Join(pathParts[:stripParts], "/"), - FileSystem: &webdavFS{collfs: fs}, + Prefix: "/" + strings.Join(pathParts[:stripParts], "/"), + FileSystem: &webdavFS{ + collfs: fs, + writing: writeMethod[r.Method], + }, LockSystem: h.webdavLS, Logger: func(_ *http.Request, err error) { - if os.IsNotExist(err) { - statusCode, statusText = http.StatusNotFound, err.Error() - } else if err != nil { - statusCode, statusText = http.StatusInternalServerError, err.Error() + if err != nil { + log.Printf("error from webdav handler: %q", err) } }, } diff --git a/services/keep-web/handler_test.go b/services/keep-web/handler_test.go index 6bd34d7113..21e47c8dc7 100644 --- a/services/keep-web/handler_test.go +++ b/services/keep-web/handler_test.go @@ -45,12 +45,12 @@ func (s *UnitSuite) TestCORSPreflight(c *check.C) { c.Check(resp.Code, check.Equals, http.StatusOK) c.Check(resp.Body.String(), check.Equals, "") c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, "*") - c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "GET, POST, OPTIONS, PROPFIND") + c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL") c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Equals, "Authorization, Content-Type, Range") // Check preflight for a disallowed request resp = httptest.NewRecorder() - req.Header.Set("Access-Control-Request-Method", "DELETE") + req.Header.Set("Access-Control-Request-Method", "MAKE-COFFEE") h.ServeHTTP(resp, req) c.Check(resp.Body.String(), check.Equals, "") c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed) diff --git a/services/keep-web/main.go b/services/keep-web/main.go index 27ceb48c78..724af27c7e 100644 --- a/services/keep-web/main.go +++ b/services/keep-web/main.go @@ -6,6 +6,7 @@ package main import ( "flag" + "fmt" "log" "os" "time" @@ -17,6 +18,7 @@ import ( var ( defaultConfigPath = "/etc/arvados/keep-web/keep-web.yml" + version = "dev" ) // Config specifies server configuration. @@ -85,9 +87,17 @@ func main() { dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit") + getVersion := flag.Bool("version", false, + "print version information and exit.") flag.Usage = usage flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("keep-web %s\n", version) + return + } + if err := config.LoadFile(cfg, configPath); err != nil { if h := os.Getenv("ARVADOS_API_HOST"); h != "" && configPath == defaultConfigPath { log.Printf("DEPRECATED: Using ARVADOS_API_HOST environment variable. Use config file instead.") @@ -105,6 +115,8 @@ func main() { log.Fatal(config.DumpAndExit(cfg)) } + log.Printf("keep-web %s started", version) + os.Setenv("ARVADOS_API_HOST", cfg.Client.APIHost) srv := &server{Config: cfg} if err := srv.Start(); err != nil { diff --git a/services/keep-web/status_test.go b/services/keep-web/status_test.go index 5f2d44cbe4..0a2b9eb988 100644 --- a/services/keep-web/status_test.go +++ b/services/keep-web/status_test.go @@ -31,6 +31,7 @@ func (s *UnitSuite) TestStatus(c *check.C) { err := json.NewDecoder(resp.Body).Decode(&status) c.Check(err, check.IsNil) c.Check(status["Cache.Requests"], check.Equals, float64(0)) + c.Check(status["Version"], check.Not(check.Equals), "") } func (s *IntegrationSuite) TestNoStatusFromVHost(c *check.C) { diff --git a/services/keep-web/webdav.go b/services/keep-web/webdav.go index 57f3f53a99..3ceb0ed5c9 100644 --- a/services/keep-web/webdav.go +++ b/services/keep-web/webdav.go @@ -9,9 +9,9 @@ import ( "errors" "fmt" prand "math/rand" - "net/http" "os" - "sync" + "path" + "strings" "sync/atomic" "time" @@ -27,103 +27,83 @@ var ( errReadOnly = errors.New("read-only filesystem") ) -// webdavFS implements a read-only webdav.FileSystem by wrapping an +// webdavFS implements a webdav.FileSystem by wrapping an // arvados.CollectionFilesystem. +// +// Collections don't preserve empty directories, so Mkdir is +// effectively a no-op, and we need to make parent dirs spring into +// existence automatically so sequences like "mkcol foo; put foo/bar" +// work as expected. type webdavFS struct { - collfs arvados.CollectionFileSystem + collfs arvados.CollectionFileSystem + writing bool } -var _ webdav.FileSystem = &webdavFS{} +func (fs *webdavFS) makeparents(name string) { + dir, name := path.Split(name) + if dir == "" || dir == "/" { + return + } + dir = dir[:len(dir)-1] + fs.makeparents(dir) + fs.collfs.Mkdir(dir, 0755) +} func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error { - return errReadOnly + if !fs.writing { + return errReadOnly + } + name = strings.TrimRight(name, "/") + fs.makeparents(name) + return fs.collfs.Mkdir(name, 0755) } -func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) { - fi, err := fs.collfs.Stat(name) - if err != nil { - return nil, err +func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) { + writing := flag&(os.O_WRONLY|os.O_RDWR) != 0 + if writing { + fs.makeparents(name) + } + f, err = fs.collfs.OpenFile(name, flag, perm) + if !fs.writing { + // webdav module returns 404 on all OpenFile errors, + // but returns 405 Method Not Allowed if OpenFile() + // succeeds but Write() or Close() fails. We'd rather + // have 405. + f = writeFailer{File: f, err: errReadOnly} } - return &webdavFile{collfs: fs.collfs, fileInfo: fi, name: name}, nil + return } func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error { - return errReadOnly + return fs.collfs.RemoveAll(name) } func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error { - return errReadOnly -} - -func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) { - return fs.collfs.Stat(name) -} - -// webdavFile implements a read-only webdav.File by wrapping -// http.File. -// -// The http.File is opened from an arvados.CollectionFileSystem, but -// not until Seek, Read, or Readdir is called. This deferred-open -// strategy makes webdav's OpenFile-Stat-Close cycle fast even though -// the collfs's Open method is slow. This is relevant because webdav -// does OpenFile-Stat-Close on each file when preparing directory -// listings. -// -// Writes to a webdavFile always fail. -type webdavFile struct { - // fields populated by (*webdavFS).OpenFile() - collfs http.FileSystem - fileInfo os.FileInfo - name string - - // internal fields - file http.File - loadOnce sync.Once - err error -} - -func (f *webdavFile) load() { - f.file, f.err = f.collfs.Open(f.name) -} - -func (f *webdavFile) Write([]byte) (int, error) { - return 0, errReadOnly -} - -func (f *webdavFile) Seek(offset int64, whence int) (int64, error) { - f.loadOnce.Do(f.load) - if f.err != nil { - return 0, f.err + if !fs.writing { + return errReadOnly } - return f.file.Seek(offset, whence) + fs.makeparents(newName) + return fs.collfs.Rename(oldName, newName) } -func (f *webdavFile) Read(buf []byte) (int, error) { - f.loadOnce.Do(f.load) - if f.err != nil { - return 0, f.err +func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) { + if fs.writing { + fs.makeparents(name) } - return f.file.Read(buf) + return fs.collfs.Stat(name) } -func (f *webdavFile) Close() error { - if f.file == nil { - // We never called load(), or load() failed - return f.err - } - return f.file.Close() +type writeFailer struct { + webdav.File + err error } -func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) { - f.loadOnce.Do(f.load) - if f.err != nil { - return nil, f.err - } - return f.file.Readdir(n) +func (wf writeFailer) Write([]byte) (int, error) { + return 0, wf.err } -func (f *webdavFile) Stat() (os.FileInfo, error) { - return f.fileInfo, nil +func (wf writeFailer) Close() error { + return wf.err } // noLockSystem implements webdav.LockSystem by returning success for diff --git a/services/keep-web/webdav_test.go b/services/keep-web/webdav_test.go new file mode 100644 index 0000000000..52db776a43 --- /dev/null +++ b/services/keep-web/webdav_test.go @@ -0,0 +1,5 @@ +package main + +import "golang.org/x/net/webdav" + +var _ webdav.FileSystem = &webdavFS{} diff --git a/services/keepproxy/keepproxy.go b/services/keepproxy/keepproxy.go index 3d1b447625..145b39d4c3 100644 --- a/services/keepproxy/keepproxy.go +++ b/services/keepproxy/keepproxy.go @@ -10,7 +10,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "net" "net/http" "os" @@ -25,12 +24,16 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/config" "git.curoverse.com/arvados.git/sdk/go/health" + "git.curoverse.com/arvados.git/sdk/go/httpserver" "git.curoverse.com/arvados.git/sdk/go/keepclient" + log "github.com/Sirupsen/logrus" "github.com/coreos/go-systemd/daemon" "github.com/ghodss/yaml" "github.com/gorilla/mux" ) +var version = "dev" + type Config struct { Client arvados.Client Listen string @@ -55,7 +58,13 @@ var ( router http.Handler ) +const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00" + func main() { + log.SetFormatter(&log.JSONFormatter{ + TimestampFormat: rfc3339NanoFixed, + }) + cfg := DefaultConfig() flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError) @@ -74,8 +83,15 @@ func main() { const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml" flagset.StringVar(&cfgPath, "config", defaultCfgPath, "Configuration file `path`") dumpConfig := flagset.Bool("dump-config", false, "write current configuration to stdout and exit") + getVersion := flagset.Bool("version", false, "Print version information and exit.") flagset.Parse(os.Args[1:]) + // Print version information if requested + if *getVersion { + fmt.Printf("keepproxy %s\n", version) + return + } + err := config.LoadFile(cfg, cfgPath) if err != nil { h := os.Getenv("ARVADOS_API_HOST") @@ -99,6 +115,8 @@ func main() { log.Fatal(config.DumpAndExit(cfg)) } + log.Printf("keepproxy %s started", version) + arv, err := arvadosclient.New(&cfg.Client) if err != nil { log.Fatalf("Error setting up arvados client %s", err.Error()) @@ -164,7 +182,7 @@ func main() { // Start serving requests. router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken) - http.Serve(listener, router) + http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router))) log.Println("shutting down") } @@ -596,7 +614,8 @@ func (h *proxyHandler) makeKeepClient(req *http.Request) *keepclient.KeepClient Timeout: h.timeout, Transport: h.transport, }, - proto: req.Proto, + proto: req.Proto, + requestID: req.Header.Get("X-Request-Id"), } return &kc } diff --git a/services/keepproxy/keepproxy_test.go b/services/keepproxy/keepproxy_test.go index 23bb2bd216..a7b608b69c 100644 --- a/services/keepproxy/keepproxy_test.go +++ b/services/keepproxy/keepproxy_test.go @@ -10,7 +10,6 @@ import ( "errors" "fmt" "io/ioutil" - "log" "math/rand" "net/http" "net/http/httptest" @@ -57,7 +56,7 @@ func waitForListener() { time.Sleep(ms * time.Millisecond) } if listener == nil { - log.Fatalf("Timed out waiting for listener to start") + panic("Timed out waiting for listener to start") } } @@ -255,14 +254,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) { { _, _, err := kc.Ask(hash) c.Check(err, Equals, keepclient.BlockNotFound) - log.Print("Finished Ask (expected BlockNotFound)") + c.Log("Finished Ask (expected BlockNotFound)") } { reader, _, _, err := kc.Get(hash) c.Check(reader, Equals, nil) c.Check(err, Equals, keepclient.BlockNotFound) - log.Print("Finished Get (expected BlockNotFound)") + c.Log("Finished Get (expected BlockNotFound)") } // Note in bug #5309 among other errors keepproxy would set @@ -281,14 +280,14 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) { c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash)) c.Check(rep, Equals, 2) c.Check(err, Equals, nil) - log.Print("Finished PutB (expected success)") + c.Log("Finished PutB (expected success)") } { blocklen, _, err := kc.Ask(hash2) c.Assert(err, Equals, nil) c.Check(blocklen, Equals, int64(3)) - log.Print("Finished Ask (expected success)") + c.Log("Finished Ask (expected success)") } { @@ -297,7 +296,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) { all, err := ioutil.ReadAll(reader) c.Check(all, DeepEquals, []byte("foo")) c.Check(blocklen, Equals, int64(3)) - log.Print("Finished Get (expected success)") + c.Log("Finished Get (expected success)") } { @@ -307,7 +306,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) { c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`) c.Check(rep, Equals, 2) c.Check(err, Equals, nil) - log.Print("Finished PutB zero block") + c.Log("Finished PutB zero block") } { @@ -316,7 +315,7 @@ func (s *ServerRequiredSuite) TestPutAskGet(c *C) { all, err := ioutil.ReadAll(reader) c.Check(all, DeepEquals, []byte("")) c.Check(blocklen, Equals, int64(0)) - log.Print("Finished Get zero block") + c.Log("Finished Get zero block") } } @@ -331,7 +330,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) { errNotFound, _ := err.(keepclient.ErrNotFound) c.Check(errNotFound, NotNil) c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true) - log.Print("Ask 1") + c.Log("Ask 1") } { @@ -339,7 +338,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) { c.Check(hash2, Equals, "") c.Check(rep, Equals, 0) c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New(""))) - log.Print("PutB") + c.Log("PutB") } { @@ -348,7 +347,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) { c.Check(errNotFound, NotNil) c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true) c.Check(blocklen, Equals, int64(0)) - log.Print("Ask 2") + c.Log("Ask 2") } { @@ -357,7 +356,7 @@ func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) { c.Check(errNotFound, NotNil) c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true) c.Check(blocklen, Equals, int64(0)) - log.Print("Get") + c.Log("Get") } } @@ -372,7 +371,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) { errNotFound, _ := err.(keepclient.ErrNotFound) c.Check(errNotFound, NotNil) c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true) - log.Print("Ask 1") + c.Log("Ask 1") } { @@ -380,7 +379,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) { c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash)) c.Check(rep, Equals, 2) c.Check(err, Equals, nil) - log.Print("PutB") + c.Log("PutB") } { @@ -389,7 +388,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) { c.Check(errNotFound, NotNil) c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true) c.Check(blocklen, Equals, int64(0)) - log.Print("Ask 2") + c.Log("Ask 2") } { @@ -398,7 +397,7 @@ func (s *ServerRequiredSuite) TestGetDisabled(c *C) { c.Check(errNotFound, NotNil) c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true) c.Check(blocklen, Equals, int64(0)) - log.Print("Get") + c.Log("Get") } } diff --git a/services/keepproxy/proxy_client.go b/services/keepproxy/proxy_client.go index 0faf4aea0e..3fa2671df5 100644 --- a/services/keepproxy/proxy_client.go +++ b/services/keepproxy/proxy_client.go @@ -13,11 +13,13 @@ import ( var viaAlias = "keepproxy" type proxyClient struct { - client keepclient.HTTPClient - proto string + client keepclient.HTTPClient + proto string + requestID string } func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) { req.Header.Add("Via", pc.proto+" "+viaAlias) + req.Header.Add("X-Request-Id", pc.requestID) return pc.client.Do(req) } diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 424910dfa8..4d042a70dd 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -975,7 +975,7 @@ func TestGetHandlerClientDisconnect(t *testing.T) { ok := make(chan struct{}) go func() { req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil) - (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req) + MakeRESTRouter().ServeHTTP(resp, req) ok <- struct{}{} }() diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 2d90aba14e..daf4fc69dd 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -311,6 +311,7 @@ type NodeStatus struct { TrashQueue WorkQueueStatus RequestsCurrent int RequestsMax int + Version string } var st NodeStatus @@ -346,6 +347,7 @@ func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) { // populate the given NodeStatus struct with current values. func (rtr *router) readNodeStatus(st *NodeStatus) { + st.Version = version vols := KeepVM.AllReadable() if cap(st.Volumes) < len(vols) { st.Volumes = make([]*volumeStatusEnt, len(vols)) diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go index 921176dbbe..b8a0ffb1cb 100644 --- a/services/keepstore/keepstore.go +++ b/services/keepstore/keepstore.go @@ -22,6 +22,8 @@ import ( "github.com/coreos/go-systemd/daemon" ) +var version = "dev" + // A Keep "block" is 64MB. const BlockSize = 64 * 1024 * 1024 @@ -89,6 +91,7 @@ func main() { deprecated.beforeFlagParse(theConfig) dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)") + getVersion := flag.Bool("version", false, "Print version information and exit.") defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml" var configPath string @@ -100,6 +103,12 @@ func main() { flag.Usage = usage flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("keepstore %s\n", version) + return + } + deprecated.afterFlagParse(theConfig) err := config.LoadFile(theConfig, configPath) @@ -111,6 +120,8 @@ func main() { log.Fatal(config.DumpAndExit(theConfig)) } + log.Printf("keepstore %s started", version) + err = theConfig.Start() if err != nil { log.Fatal(err) @@ -147,11 +158,11 @@ func main() { // Start a round-robin VolumeManager with the volumes we have found. KeepVM = MakeRRVolumeManager(theConfig.Volumes) - // Middleware stack: logger, MaxRequests limiter, method handlers + // Middleware/handler stack router := MakeRESTRouter() limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router) router.limiter = limiter - http.Handle("/", &LoggingRESTRouter{router: limiter}) + http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter))) // Set up a TCP listener. listener, err := net.Listen("tcp", theConfig.Listen) diff --git a/services/keepstore/logging_router.go b/services/keepstore/logging_router.go deleted file mode 100644 index 63c28a24b3..0000000000 --- a/services/keepstore/logging_router.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: AGPL-3.0 - -package main - -// LoggingRESTRouter -// LoggingResponseWriter - -import ( - "context" - "net/http" - "strings" - "time" - - "git.curoverse.com/arvados.git/sdk/go/httpserver" - "git.curoverse.com/arvados.git/sdk/go/stats" - log "github.com/Sirupsen/logrus" -) - -// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody -type LoggingResponseWriter struct { - Status int - Length int - http.ResponseWriter - ResponseBody string - sentHdr time.Time -} - -// CloseNotify implements http.CloseNotifier. -func (resp *LoggingResponseWriter) CloseNotify() <-chan bool { - wrapped, ok := resp.ResponseWriter.(http.CloseNotifier) - if !ok { - // If upstream doesn't implement CloseNotifier, we can - // satisfy the interface by returning a channel that - // never sends anything (the interface doesn't - // guarantee that anything will ever be sent on the - // channel even if the client disconnects). - return nil - } - return wrapped.CloseNotify() -} - -// WriteHeader writes header to ResponseWriter -func (resp *LoggingResponseWriter) WriteHeader(code int) { - if resp.sentHdr == zeroTime { - resp.sentHdr = time.Now() - } - resp.Status = code - resp.ResponseWriter.WriteHeader(code) -} - -var zeroTime time.Time - -func (resp *LoggingResponseWriter) Write(data []byte) (int, error) { - if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime { - resp.sentHdr = time.Now() - } - resp.Length += len(data) - if resp.Status >= 400 { - resp.ResponseBody += string(data) - } - return resp.ResponseWriter.Write(data) -} - -// LoggingRESTRouter is used to add logging capabilities to mux.Router -type LoggingRESTRouter struct { - router http.Handler - idGenerator httpserver.IDGenerator -} - -func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) { - tStart := time.Now() - - // Attach a requestID-aware logger to the request context. - lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next()) - ctx := context.WithValue(req.Context(), "logger", lgr) - req = req.WithContext(ctx) - - lgr = lgr.WithFields(log.Fields{ - "remoteAddr": req.RemoteAddr, - "reqForwardedFor": req.Header.Get("X-Forwarded-For"), - "reqMethod": req.Method, - "reqPath": req.URL.Path[1:], - "reqBytes": req.ContentLength, - }) - lgr.Debug("request") - - resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime} - loggingRouter.router.ServeHTTP(&resp, req) - tDone := time.Now() - - statusText := http.StatusText(resp.Status) - if resp.Status >= 400 { - statusText = strings.Replace(resp.ResponseBody, "\n", "", -1) - } - if resp.sentHdr == zeroTime { - // Nobody changed status or wrote any data, i.e., we - // returned a 200 response with no body. - resp.sentHdr = tDone - } - - lgr.WithFields(log.Fields{ - "timeTotal": stats.Duration(tDone.Sub(tStart)), - "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)), - "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)), - "respStatusCode": resp.Status, - "respStatus": statusText, - "respBytes": resp.Length, - }).Info("response") -} diff --git a/services/keepstore/logging_router_test.go b/services/keepstore/logging_router_test.go deleted file mode 100644 index 6ca48dc89c..0000000000 --- a/services/keepstore/logging_router_test.go +++ /dev/null @@ -1,14 +0,0 @@ -// Copyright (C) The Arvados Authors. All rights reserved. -// -// SPDX-License-Identifier: AGPL-3.0 - -package main - -import ( - "net/http" - "testing" -) - -func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) { - http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify() -} diff --git a/services/keepstore/pull_worker_test.go b/services/keepstore/pull_worker_test.go index 9e547f30d0..7a8297039b 100644 --- a/services/keepstore/pull_worker_test.go +++ b/services/keepstore/pull_worker_test.go @@ -271,6 +271,7 @@ func (s *PullWorkerTestSuite) performTest(testData PullWorkerTestData, c *C) { c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0)) c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0)) + c.Check(getStatusItem("Version"), Not(Equals), "") response := IssueRequest(&testData.req) c.Assert(response.Code, Equals, testData.responseCode) diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index e6a53d06c6..61e69f9096 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -45,6 +45,8 @@ var ( s3RaceWindow time.Duration s3ACL = s3.Private + + zeroTime time.Time ) const ( diff --git a/services/nodemanager/arvnodeman/launcher.py b/services/nodemanager/arvnodeman/launcher.py index d85ef552c0..888abf5a76 100644 --- a/services/nodemanager/arvnodeman/launcher.py +++ b/services/nodemanager/arvnodeman/launcher.py @@ -125,7 +125,7 @@ def main(args=None): try: root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels()) - root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__) + root_logger.info("%s %s started, libcloud %s", sys.argv[0], __version__, libcloud.__version__) node_setup, node_shutdown, node_update, node_monitor = \ config.dispatch_classes() server_calculator = build_server_calculator(config) diff --git a/services/nodemanager/arvnodeman/status.py b/services/nodemanager/arvnodeman/status.py index cfd611285c..069bf16895 100644 --- a/services/nodemanager/arvnodeman/status.py +++ b/services/nodemanager/arvnodeman/status.py @@ -11,6 +11,8 @@ import logging import socketserver import threading +from ._version import __version__ + _logger = logging.getLogger('status.Handler') @@ -74,10 +76,11 @@ class Tracker(object): def __init__(self): self._mtx = threading.Lock() self._latest = {} + self._version = {'Version' : __version__} def get_json(self): with self._mtx: - return json.dumps(self._latest) + return json.dumps(dict(self._latest, **self._version)) def keys(self): with self._mtx: diff --git a/services/nodemanager/tests/test_status.py b/services/nodemanager/tests/test_status.py index a236e4f0ee..23658667a6 100644 --- a/services/nodemanager/tests/test_status.py +++ b/services/nodemanager/tests/test_status.py @@ -56,6 +56,7 @@ class StatusServerUpdates(unittest.TestCase): resp = r.json() self.assertEqual(n, resp['nodes_'+str(n)]) self.assertEqual(1, resp['nodes_1']) + self.assertIn('Version', resp) class StatusServerDisabled(unittest.TestCase): diff --git a/services/ws/main.go b/services/ws/main.go index db33cbfd00..a0006a4f8a 100644 --- a/services/ws/main.go +++ b/services/ws/main.go @@ -13,15 +13,23 @@ import ( ) var logger = ctxlog.FromContext +var version = "dev" func main() { log := logger(nil) configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file") dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit") + getVersion := flag.Bool("version", false, "Print version information and exit.") cfg := defaultConfig() flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("arvados-ws %s\n", version) + return + } + err := config.LoadFile(&cfg, *configPath) if err != nil { log.Fatal(err) @@ -39,7 +47,7 @@ func main() { return } - log.Info("started") + log.Printf("arvados-ws %s started", version) srv := &server{wsConfig: &cfg} log.Fatal(srv.Run()) } diff --git a/services/ws/router.go b/services/ws/router.go index 2b9bd66b1c..987c225eac 100644 --- a/services/ws/router.go +++ b/services/ws/router.go @@ -123,6 +123,7 @@ func (rtr *router) DebugStatus() interface{} { func (rtr *router) Status() interface{} { return map[string]interface{}{ "Clients": atomic.LoadInt64(&rtr.status.ReqsActive), + "Version": version, } } diff --git a/services/ws/server_test.go b/services/ws/server_test.go index c1caa2ad37..b1f943857a 100644 --- a/services/ws/server_test.go +++ b/services/ws/server_test.go @@ -5,6 +5,7 @@ package main import ( + "encoding/json" "io/ioutil" "net/http" "sync" @@ -90,6 +91,21 @@ func (s *serverSuite) TestHealth(c *check.C) { } } +func (s *serverSuite) TestStatus(c *check.C) { + go s.srv.Run() + defer s.srv.Close() + s.srv.WaitReady() + req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/status.json", nil) + c.Assert(err, check.IsNil) + resp, err := http.DefaultClient.Do(req) + c.Check(err, check.IsNil) + c.Check(resp.StatusCode, check.Equals, http.StatusOK) + var status map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&status) + c.Check(err, check.IsNil) + c.Check(status["Version"], check.Not(check.Equals), "") +} + func (s *serverSuite) TestHealthDisabled(c *check.C) { s.cfg.ManagementToken = "" diff --git a/tools/arv-sync-groups/arv-sync-groups.go b/tools/arv-sync-groups/arv-sync-groups.go index d7efdefb6f..6b4781c354 100644 --- a/tools/arv-sync-groups/arv-sync-groups.go +++ b/tools/arv-sync-groups/arv-sync-groups.go @@ -19,6 +19,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvados" ) +var version = "dev" + type resourceList interface { Len() int GetItems() []interface{} @@ -150,6 +152,10 @@ func ParseFlags(config *ConfigParams) error { "verbose", false, "Log informational messages. Off by default.") + getVersion := flags.Bool( + "version", + false, + "Print version information and exit.") parentGroupUUID := flags.String( "parent-group-uuid", "", @@ -158,6 +164,12 @@ func ParseFlags(config *ConfigParams) error { // Parse args; omit the first arg which is the command name flags.Parse(os.Args[1:]) + // Print version information if requested + if *getVersion { + fmt.Printf("arv-sync-groups %s\n", version) + os.Exit(0) + } + // Input file as a required positional argument if flags.NArg() == 0 { return fmt.Errorf("please provide a path to an input file") @@ -276,7 +288,7 @@ func doMain(cfg *ConfigParams) error { } defer f.Close() - log.Printf("Group sync starting. Using %q as users id and parent group UUID %q", cfg.UserID, cfg.ParentGroupUUID) + log.Printf("arv-sync-groups %s started. Using %q as users id and parent group UUID %q", version, cfg.UserID, cfg.ParentGroupUUID) // Get the complete user list to minimize API Server requests allUsers := make(map[string]arvados.User) diff --git a/tools/crunchstat-summary/MANIFEST.in b/tools/crunchstat-summary/MANIFEST.in index f87cccadbf..d1c3037ca2 100644 --- a/tools/crunchstat-summary/MANIFEST.in +++ b/tools/crunchstat-summary/MANIFEST.in @@ -4,4 +4,4 @@ include agpl-3.0.txt include crunchstat_summary/dygraphs.js -include crunchstat_summary/chartjs.js +include crunchstat_summary/synchronizer.js diff --git a/tools/crunchstat-summary/crunchstat_summary/dygraphs.js b/tools/crunchstat-summary/crunchstat_summary/dygraphs.js index 5c1d0994bf..52e5534ef1 100644 --- a/tools/crunchstat-summary/crunchstat_summary/dygraphs.js +++ b/tools/crunchstat-summary/crunchstat_summary/dygraphs.js @@ -72,6 +72,8 @@ window.onload = function() { }); }); + var sync = Dygraph.synchronize(Object.values(charts), {range: false}); + if (typeof window.debug === 'undefined') window.debug = {}; window.debug.charts = charts; diff --git a/tools/crunchstat-summary/crunchstat_summary/dygraphs.py b/tools/crunchstat-summary/crunchstat_summary/dygraphs.py index e72832eb5e..1314e9df35 100644 --- a/tools/crunchstat-summary/crunchstat_summary/dygraphs.py +++ b/tools/crunchstat-summary/crunchstat_summary/dygraphs.py @@ -8,7 +8,7 @@ import crunchstat_summary.webchart class DygraphsChart(crunchstat_summary.webchart.WebChart): CSS = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.css' JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.js' - JSASSET = 'dygraphs.js' + JSASSETS = ['synchronizer.js','dygraphs.js'] def headHTML(self): return '\n'.format(self.CSS) diff --git a/tools/crunchstat-summary/crunchstat_summary/synchronizer.js b/tools/crunchstat-summary/crunchstat_summary/synchronizer.js new file mode 100644 index 0000000000..78c8d4278f --- /dev/null +++ b/tools/crunchstat-summary/crunchstat_summary/synchronizer.js @@ -0,0 +1,273 @@ +/** + * Synchronize zooming and/or selections between a set of dygraphs. + * + * Usage: + * + * var g1 = new Dygraph(...), + * g2 = new Dygraph(...), + * ...; + * var sync = Dygraph.synchronize(g1, g2, ...); + * // charts are now synchronized + * sync.detach(); + * // charts are no longer synchronized + * + * You can set options using the last parameter, for example: + * + * var sync = Dygraph.synchronize(g1, g2, g3, { + * selection: true, + * zoom: true + * }); + * + * The default is to synchronize both of these. + * + * Instead of passing one Dygraph object as each parameter, you may also pass an + * array of dygraphs: + * + * var sync = Dygraph.synchronize([g1, g2, g3], { + * selection: false, + * zoom: true + * }); + * + * You may also set `range: false` if you wish to only sync the x-axis. + * The `range` option has no effect unless `zoom` is true (the default). + * + * SPDX-License-Identifier: MIT + * Original source: https://github.com/danvk/dygraphs/blob/master/src/extras/synchronizer.js + * at commit b55a71d768d2f8de62877c32b3aec9e9975ac389 + * + * Copyright (c) 2009 Dan Vanderkam + * + * Permission is hereby granted, free of charge, to any person + * obtaining a copy of this software and associated documentation + * files (the "Software"), to deal in the Software without + * restriction, including without limitation the rights to use, + * copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following + * conditions: + * + * The above copyright notice and this permission notice shall be + * included in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES + * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT + * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ +(function() { +/* global Dygraph:false */ +'use strict'; + +var Dygraph; +if (window.Dygraph) { + Dygraph = window.Dygraph; +} else if (typeof(module) !== 'undefined') { + Dygraph = require('../dygraph'); +} + +var synchronize = function(/* dygraphs..., opts */) { + if (arguments.length === 0) { + throw 'Invalid invocation of Dygraph.synchronize(). Need >= 1 argument.'; + } + + var OPTIONS = ['selection', 'zoom', 'range']; + var opts = { + selection: true, + zoom: true, + range: true + }; + var dygraphs = []; + var prevCallbacks = []; + + var parseOpts = function(obj) { + if (!(obj instanceof Object)) { + throw 'Last argument must be either Dygraph or Object.'; + } else { + for (var i = 0; i < OPTIONS.length; i++) { + var optName = OPTIONS[i]; + if (obj.hasOwnProperty(optName)) opts[optName] = obj[optName]; + } + } + }; + + if (arguments[0] instanceof Dygraph) { + // Arguments are Dygraph objects. + for (var i = 0; i < arguments.length; i++) { + if (arguments[i] instanceof Dygraph) { + dygraphs.push(arguments[i]); + } else { + break; + } + } + if (i < arguments.length - 1) { + throw 'Invalid invocation of Dygraph.synchronize(). ' + + 'All but the last argument must be Dygraph objects.'; + } else if (i == arguments.length - 1) { + parseOpts(arguments[arguments.length - 1]); + } + } else if (arguments[0].length) { + // Invoked w/ list of dygraphs, options + for (var i = 0; i < arguments[0].length; i++) { + dygraphs.push(arguments[0][i]); + } + if (arguments.length == 2) { + parseOpts(arguments[1]); + } else if (arguments.length > 2) { + throw 'Invalid invocation of Dygraph.synchronize(). ' + + 'Expected two arguments: array and optional options argument.'; + } // otherwise arguments.length == 1, which is fine. + } else { + throw 'Invalid invocation of Dygraph.synchronize(). ' + + 'First parameter must be either Dygraph or list of Dygraphs.'; + } + + if (dygraphs.length < 2) { + throw 'Invalid invocation of Dygraph.synchronize(). ' + + 'Need two or more dygraphs to synchronize.'; + } + + var readycount = dygraphs.length; + for (var i = 0; i < dygraphs.length; i++) { + var g = dygraphs[i]; + g.ready( function() { + if (--readycount == 0) { + // store original callbacks + var callBackTypes = ['drawCallback', 'highlightCallback', 'unhighlightCallback']; + for (var j = 0; j < dygraphs.length; j++) { + if (!prevCallbacks[j]) { + prevCallbacks[j] = {}; + } + for (var k = callBackTypes.length - 1; k >= 0; k--) { + prevCallbacks[j][callBackTypes[k]] = dygraphs[j].getFunctionOption(callBackTypes[k]); + } + } + + // Listen for draw, highlight, unhighlight callbacks. + if (opts.zoom) { + attachZoomHandlers(dygraphs, opts, prevCallbacks); + } + + if (opts.selection) { + attachSelectionHandlers(dygraphs, prevCallbacks); + } + } + }); + } + + return { + detach: function() { + for (var i = 0; i < dygraphs.length; i++) { + var g = dygraphs[i]; + if (opts.zoom) { + g.updateOptions({drawCallback: prevCallbacks[i].drawCallback}); + } + if (opts.selection) { + g.updateOptions({ + highlightCallback: prevCallbacks[i].highlightCallback, + unhighlightCallback: prevCallbacks[i].unhighlightCallback + }); + } + } + // release references & make subsequent calls throw. + dygraphs = null; + opts = null; + prevCallbacks = null; + } + }; +}; + +function arraysAreEqual(a, b) { + if (!Array.isArray(a) || !Array.isArray(b)) return false; + var i = a.length; + if (i !== b.length) return false; + while (i--) { + if (a[i] !== b[i]) return false; + } + return true; +} + +function attachZoomHandlers(gs, syncOpts, prevCallbacks) { + var block = false; + for (var i = 0; i < gs.length; i++) { + var g = gs[i]; + g.updateOptions({ + drawCallback: function(me, initial) { + if (block || initial) return; + block = true; + var opts = { + dateWindow: me.xAxisRange() + }; + if (syncOpts.range) opts.valueRange = me.yAxisRange(); + + for (var j = 0; j < gs.length; j++) { + if (gs[j] == me) { + if (prevCallbacks[j] && prevCallbacks[j].drawCallback) { + prevCallbacks[j].drawCallback.apply(this, arguments); + } + continue; + } + + // Only redraw if there are new options + if (arraysAreEqual(opts.dateWindow, gs[j].getOption('dateWindow')) && + arraysAreEqual(opts.valueRange, gs[j].getOption('valueRange'))) { + continue; + } + + gs[j].updateOptions(opts); + } + block = false; + } + }, true /* no need to redraw */); + } +} + +function attachSelectionHandlers(gs, prevCallbacks) { + var block = false; + for (var i = 0; i < gs.length; i++) { + var g = gs[i]; + + g.updateOptions({ + highlightCallback: function(event, x, points, row, seriesName) { + if (block) return; + block = true; + var me = this; + for (var i = 0; i < gs.length; i++) { + if (me == gs[i]) { + if (prevCallbacks[i] && prevCallbacks[i].highlightCallback) { + prevCallbacks[i].highlightCallback.apply(this, arguments); + } + continue; + } + var idx = gs[i].getRowForX(x); + if (idx !== null) { + gs[i].setSelection(idx, seriesName); + } + } + block = false; + }, + unhighlightCallback: function(event) { + if (block) return; + block = true; + var me = this; + for (var i = 0; i < gs.length; i++) { + if (me == gs[i]) { + if (prevCallbacks[i] && prevCallbacks[i].unhighlightCallback) { + prevCallbacks[i].unhighlightCallback.apply(this, arguments); + } + continue; + } + gs[i].clearSelection(); + } + block = false; + } + }, true /* no need to redraw */); + } +} + +Dygraph.synchronize = synchronize; + +})(); diff --git a/tools/crunchstat-summary/crunchstat_summary/webchart.py b/tools/crunchstat-summary/crunchstat_summary/webchart.py index 790b08da87..9d18883ce2 100644 --- a/tools/crunchstat-summary/crunchstat_summary/webchart.py +++ b/tools/crunchstat-summary/crunchstat_summary/webchart.py @@ -10,7 +10,7 @@ import pkg_resources class WebChart(object): """Base class for a web chart. - Subclasses must assign JSLIB and JSASSET, and override the + Subclasses must assign JSLIB and JSASSETS, and override the chartdata() method. """ JSLIB = None @@ -33,7 +33,7 @@ class WebChart(object): def js(self): return 'var chartdata = {};\n{}'.format( json.dumps(self.sections()), - pkg_resources.resource_string('crunchstat_summary', self.JSASSET)) + '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS])) def sections(self): return [ diff --git a/tools/keep-block-check/keep-block-check.go b/tools/keep-block-check/keep-block-check.go index 7dca3293d2..2de7a96c9a 100644 --- a/tools/keep-block-check/keep-block-check.go +++ b/tools/keep-block-check/keep-block-check.go @@ -20,6 +20,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/keepclient" ) +var version = "dev" + func main() { err := doMain(os.Args[1:]) if err != nil { @@ -62,9 +64,20 @@ func doMain(args []string) error { false, "Log progress of each block verification") + getVersion := flags.Bool( + "version", + false, + "Print version information and exit.") + // Parse args; omit the first arg which is the command name flags.Parse(args) + // Print version information if requested + if *getVersion { + fmt.Printf("keep-block-check %s\n", version) + os.Exit(0) + } + config, blobSigningKey, err := loadConfig(*configFile) if err != nil { return fmt.Errorf("Error loading configuration from file: %s", err.Error()) diff --git a/tools/keep-exercise/keep-exercise.go b/tools/keep-exercise/keep-exercise.go index 6c8a866291..6bf1abbba3 100644 --- a/tools/keep-exercise/keep-exercise.go +++ b/tools/keep-exercise/keep-exercise.go @@ -22,16 +22,20 @@ import ( "crypto/rand" "encoding/binary" "flag" + "fmt" "io" "io/ioutil" "log" "net/http" + "os" "time" "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" ) +var version = "dev" + // Command line config knobs var ( BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op") @@ -43,11 +47,20 @@ var ( StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable") ServiceURL = flag.String("url", "", "specify scheme://host of a single keep service to exercise (instead of using all advertised services like normal clients)") ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise") + getVersion = flag.Bool("version", false, "Print version information and exit.") ) func main() { flag.Parse() + // Print version information if requested + if *getVersion { + fmt.Printf("keep-exercise %s\n", version) + os.Exit(0) + } + + log.Printf("keep-exercise %s started", version) + arv, err := arvadosclient.MakeArvadosClient() if err != nil { log.Fatal(err) diff --git a/tools/keep-rsync/keep-rsync.go b/tools/keep-rsync/keep-rsync.go index a299d17feb..303f71f8fe 100644 --- a/tools/keep-rsync/keep-rsync.go +++ b/tools/keep-rsync/keep-rsync.go @@ -21,6 +21,8 @@ import ( "git.curoverse.com/arvados.git/sdk/go/keepclient" ) +var version = "dev" + func main() { err := doMain() if err != nil { @@ -69,9 +71,20 @@ func doMain() error { 0, "Lifetime of blob permission signatures on source keepservers. If not provided, this will be retrieved from the API server's discovery document.") + getVersion := flags.Bool( + "version", + false, + "Print version information and exit.") + // Parse args; omit the first arg which is the command name flags.Parse(os.Args[1:]) + // Print version information if requested + if *getVersion { + fmt.Printf("keep-rsync %s\n", version) + os.Exit(0) + } + srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile) if err != nil { return fmt.Errorf("Error loading src configuration from file: %s", err.Error())