@client_mtx = Mutex.new
end
- def api(resources_kind, action, data=nil, tokens={})
+ def api(resources_kind, action, data=nil, tokens={}, include_anon_token=true)
profile_checkpoint
'reader_tokens' => ((tokens[:reader_tokens] ||
Thread.current[:reader_tokens] ||
[]) +
- [Rails.configuration.anonymous_user_token]).to_json,
+ (include_anon_token ? [Rails.configuration.anonymous_user_token] : [])).to_json,
}
if !data.nil?
data.each do |k,v|
end
def self.current
- res = arvados_api_client.api self, '/current'
+ res = arvados_api_client.api self, '/current', nil, {}, false
arvados_api_client.unpack_api_response(res)
end
return 1
fi
- go get "git.curoverse.com/arvados.git/$src_path"
+ go get -ldflags "-X main.version=${version}" "git.curoverse.com/arvados.git/$src_path"
declare -a switches=()
systemd_unit="$WORKSPACE/${src_path}/${prog}.service"
export GOPATH
mkdir -p "$GOPATH/src/git.curoverse.com"
+rmdir --parents "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH"
ln -sfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git" \
|| fatal "symlink failed"
go get -v github.com/kardianos/govendor \
- user/topics/run-command.html.textile.liquid
- user/reference/job-pipeline-ref.html.textile.liquid
- user/examples/crunch-examples.html.textile.liquid
+ - user/topics/arv-sync-groups.html.textile.liquid
- Query the metadata database:
- user/topics/tutorial-trait-search.html.textile.liquid
- Arvados License:
SPDX-License-Identifier: CC-BY-SA-3.0
{% endcomment %}
-The Keep-web server provides read-only HTTP access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides SSL support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
+The Keep-web server provides read/write HTTP (WebDAV) access to files stored in Keep. It serves public data to unauthenticated clients, and serves private data to clients that supply Arvados API tokens. It can be installed anywhere with access to Keep services, typically behind a web proxy that provides TLS support. See the "godoc page":http://godoc.org/github.com/curoverse/arvados/services/keep-web for more detail.
By convention, we use the following hostnames for the Keep-web service:
-allow-anonymous
Serve public data to anonymous clients. Try the token supplied in the ARVADOS_API_TOKEN environment variable when none of the tokens provided in an HTTP request succeed in reading the desired collection. (default false)
-attachment-only-host string
- Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or SSL.
+ Accept credentials, and add "Content-Disposition: attachment" response headers, for requests at this hostname:port. Prohibiting inline display makes it possible to serve untrusted and non-public content from a single origin, i.e., without wildcard DNS or TLS.
-listen string
Address to listen on: "host:port", or ":port" to listen on all interfaces. (default ":80")
-trust-all-content
Omit the @-allow-anonymous@ argument if you do not want to serve public data.
-Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's SSL certificate is not signed by a recognized CA.
+Set @ARVADOS_API_HOST_INSECURE=1@ if your API server's TLS certificate is not signed by a recognized CA.
-h3. Set up a reverse proxy with SSL support
+h3. Set up a reverse proxy with TLS support
-The Keep-web service will be accessible from anywhere on the internet, so we recommend using SSL for transport encryption.
+The Keep-web service will be accessible from anywhere on the internet, so we recommend using TLS for transport encryption.
-This is best achieved by putting a reverse proxy with SSL support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
+This is best achieved by putting a reverse proxy with TLS support in front of Keep-web, running on port 443 and passing requests to Keep-web on port 9002 (or whatever port you chose in your run script).
-Note: A wildcard SSL certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard SSL certificate and DNS configured appropriately, all data can be served as web content.
+Note: A wildcard TLS certificate is required in order to support a full-featured secure Keep-web service. Without it, Keep-web can offer file downloads for all Keep data; however, in order to avoid cross-site scripting vulnerabilities, Keep-web refuses to serve private data as web content except when it is accessed using a "secret link" share. With a wildcard TLS certificate and DNS configured appropriately, all data can be served as web content.
For example, using Nginx:
Configure your DNS servers so the following names resolve to your Nginx proxy's public IP address.
* @download.uuid_prefix.your.domain@
* @collections.uuid_prefix.your.domain@
-* @*--collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
-* @*.collections.uuid_prefix.your.domain@, if you have a wildcard SSL certificate valid for these names.
+* @*--collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for @*.uuid_prefix.your.domain@ and your DNS server allows this without interfering with other DNS names.
+* @*.collections.uuid_prefix.your.domain@, if you have a wildcard TLS certificate valid for these names.
If neither of the above wildcard options is feasible, you have two choices:
# Serve web content at @collections.uuid_prefix.your.domain@, but only for unauthenticated requests (public data and collection sharing links). Authenticated requests will always result in file downloads, using the @download@ name. For example, the Workbench "preview" button and the "view entire log file" link will invoke file downloads instead of displaying content in the browser window.
|==--submit-runner-ram== SUBMIT_RUNNER_RAM|RAM (in MiB) required for the workflow runner job (default 1024)|
|==--submit-runner-image== SUBMIT_RUNNER_IMAGE|Docker image for workflow runner job, default arvados/jobs|
|==--name== NAME| Name to use for workflow execution instance.|
-|==--on-error {stop,continue}|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
+|==--on-error {stop,continue}==|Desired workflow behavior when a step fails. One of 'stop' or 'continue'. Default is 'continue'.|
|==--enable-dev==| Enable loading and running development versions of CWL spec.|
|==--intermediate-output-ttl== N|If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).|
|==--trash-intermediate==| Immediately trash intermediate outputs on workflow success.|
--- /dev/null
+---
+layout: default
+navsection: userguide
+title: "Using arv-sync-groups"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+The @arv-sync-groups@ tool allows to synchronize remote groups into Arvados from an external source.
+
+h1. Using arv-sync-groups
+
+This tool reads a CSV (comma-separated values) file having information about external groups and their members. When running it for the first time, it'll create a special group named 'Externally synchronized groups' meant to be the parent of all the remote groups.
+
+Every line on the file should have 2 values: a group name and a local user identifier, meaning that the named user is a member of the group. The tool will create the group if it doesn't exist, and add the user to it. If group member is not present on the input file, the account will be removed from the group.
+
+Users can be identified by their email address or username: the tool will check if every user exist on the system, and report back when not found. Groups on the other hand, are identified by their name.
+
+This tool is designed to be run periodically reading a file created by a remote auth system (ie: LDAP) dump script, applying what's included on the file as the source of truth.
+
+
+bq. NOTE: @arv-sync-groups@ needs to perform several administrative tasks on Arvados, so must be run using a superuser token
+
+h2. Options
+
+The following command line options are supported:
+
+table(table table-bordered table-condensed).
+|_. Option |_. Description |
+|==--help==| This list of options|
+|==--parent-group-uuid==| UUID of group to own all the externally synchronized groups|
+|==--user-id== | Identifier to use in looking up user. One of 'email' or 'username' (Default: 'email')|
+|==--verbose==| Log informational messages (Default: False)|
+|==--version==| Print version and exit|
+
+h2. Examples
+
+To sync groups using the username to identify every account, reading from some @external_groups.csv@ file, the command should be called as follows:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
+
+If you want to use a specific preexisting group as the parent of all the remote groups, you can do it this way:
+
+<notextile>
+<pre><code>~$ <span class="userinput">arv-sync-groups --parent-group-uuid <preexisting group UUID> --user-id username /path/to/external_groups.csv </span>
+</code></pre>
+</notextile>
package arvados
import (
+ "bytes"
"crypto/tls"
"encoding/json"
"fmt"
//
// path must not contain a query string.
func (c *Client) RequestAndDecode(dst interface{}, method, path string, body io.Reader, params interface{}) error {
+ if body, ok := body.(io.Closer); ok {
+ // Ensure body is closed even if we error out early
+ defer body.Close()
+ }
urlString := c.apiURL(path)
urlValues, err := anythingToValues(params)
if err != nil {
return c.DoAndDecode(dst, req)
}
+type resource interface {
+ resourceName() string
+}
+
+// UpdateBody returns an io.Reader suitable for use as an http.Request
+// Body for a create or update API call.
+func (c *Client) UpdateBody(rsc resource) io.Reader {
+ j, err := json.Marshal(rsc)
+ if err != nil {
+ // Return a reader that returns errors.
+ r, w := io.Pipe()
+ w.CloseWithError(err)
+ return r
+ }
+ v := url.Values{rsc.resourceName(): {string(j)}}
+ return bytes.NewBufferString(v.Encode())
+}
+
func (c *Client) httpClient() *http.Client {
switch {
case c.Client != nil:
IsTrashed bool `json:"is_trashed,omitempty"`
}
+func (c Collection) resourceName() string {
+ return "collection"
+}
+
// SizedDigests returns the hash+size part of each data block
// referenced by the collection.
func (c *Collection) SizedDigests() ([]SizedDigest, error) {
package arvados
import (
+ "errors"
+ "fmt"
"io"
"net/http"
"os"
"path"
+ "regexp"
+ "sort"
+ "strconv"
"strings"
"sync"
"time"
+)
+
+var (
+ ErrReadOnlyFile = errors.New("read-only file")
+ ErrNegativeOffset = errors.New("cannot seek to negative offset")
+ ErrFileExists = errors.New("file exists")
+ ErrInvalidOperation = errors.New("invalid operation")
+ ErrInvalidArgument = errors.New("invalid argument")
+ ErrDirectoryNotEmpty = errors.New("directory not empty")
+ ErrWriteOnlyMode = errors.New("file is O_WRONLY")
+ ErrSyncNotSupported = errors.New("O_SYNC flag is not supported")
+ ErrIsDirectory = errors.New("cannot rename file to overwrite existing directory")
+ ErrPermission = os.ErrPermission
- "git.curoverse.com/arvados.git/sdk/go/manifest"
+ maxBlockSize = 1 << 26
)
+// A File is an *os.File-like interface for reading and writing files
+// in a CollectionFileSystem.
type File interface {
io.Reader
+ io.Writer
io.Closer
io.Seeker
Size() int64
+ Readdir(int) ([]os.FileInfo, error)
+ Stat() (os.FileInfo, error)
+ Truncate(int64) error
}
type keepClient interface {
- ManifestFileReader(manifest.Manifest, string) (File, error)
+ ReadAt(locator string, p []byte, off int) (int, error)
+ PutB(p []byte) (string, int, error)
}
-type collectionFile struct {
- File
- collection *Collection
- name string
- size int64
+type fileinfo struct {
+ name string
+ mode os.FileMode
+ size int64
+ modTime time.Time
}
-func (cf *collectionFile) Size() int64 {
- return cf.size
+// Name implements os.FileInfo.
+func (fi fileinfo) Name() string {
+ return fi.name
}
-func (cf *collectionFile) Readdir(count int) ([]os.FileInfo, error) {
- return nil, io.EOF
+// ModTime implements os.FileInfo.
+func (fi fileinfo) ModTime() time.Time {
+ return fi.modTime
}
-func (cf *collectionFile) Stat() (os.FileInfo, error) {
- return collectionDirent{
- collection: cf.collection,
- name: cf.name,
- size: cf.size,
- isDir: false,
- }, nil
+// Mode implements os.FileInfo.
+func (fi fileinfo) Mode() os.FileMode {
+ return fi.mode
}
-type collectionDir struct {
- collection *Collection
- stream string
- dirents []os.FileInfo
+// IsDir implements os.FileInfo.
+func (fi fileinfo) IsDir() bool {
+ return fi.mode&os.ModeDir != 0
}
-// Readdir implements os.File.
-func (cd *collectionDir) Readdir(count int) ([]os.FileInfo, error) {
- ret := cd.dirents
- if count <= 0 {
- cd.dirents = nil
- return ret, nil
- } else if len(ret) == 0 {
- return nil, io.EOF
+// Size implements os.FileInfo.
+func (fi fileinfo) Size() int64 {
+ return fi.size
+}
+
+// Sys implements os.FileInfo.
+func (fi fileinfo) Sys() interface{} {
+ return nil
+}
+
+// A CollectionFileSystem is an http.Filesystem plus Stat() and
+// support for opening writable files. All methods are safe to call
+// from multiple goroutines.
+type CollectionFileSystem interface {
+ http.FileSystem
+
+ // analogous to os.Stat()
+ Stat(name string) (os.FileInfo, error)
+
+ // analogous to os.Create(): create/truncate a file and open it O_RDWR.
+ Create(name string) (File, error)
+
+ // Like os.OpenFile(): create or open a file or directory.
+ //
+ // If flag&os.O_EXCL==0, it opens an existing file or
+ // directory if one exists. If flag&os.O_CREATE!=0, it creates
+ // a new empty file or directory if one does not already
+ // exist.
+ //
+ // When creating a new item, perm&os.ModeDir determines
+ // whether it is a file or a directory.
+ //
+ // A file can be opened multiple times and used concurrently
+ // from multiple goroutines. However, each File object should
+ // be used by only one goroutine at a time.
+ OpenFile(name string, flag int, perm os.FileMode) (File, error)
+
+ Mkdir(name string, perm os.FileMode) error
+ Remove(name string) error
+ RemoveAll(name string) error
+ Rename(oldname, newname string) error
+
+ // Flush all file data to Keep and return a snapshot of the
+ // filesystem suitable for saving as (Collection)ManifestText.
+ // Prefix (normally ".") is a top level directory, effectively
+ // prepended to all paths in the returned manifest.
+ MarshalManifest(prefix string) (string, error)
+}
+
+type fileSystem struct {
+ dirnode
+}
+
+func (fs *fileSystem) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
+ return fs.dirnode.OpenFile(name, flag, perm)
+}
+
+func (fs *fileSystem) Open(name string) (http.File, error) {
+ return fs.dirnode.OpenFile(name, os.O_RDONLY, 0)
+}
+
+func (fs *fileSystem) Create(name string) (File, error) {
+ return fs.dirnode.OpenFile(name, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0)
+}
+
+func (fs *fileSystem) Stat(name string) (fi os.FileInfo, err error) {
+ node := fs.dirnode.lookupPath(name)
+ if node == nil {
+ err = os.ErrNotExist
+ } else {
+ fi = node.Stat()
}
- var err error
- if count >= len(ret) {
- count = len(ret)
- err = io.EOF
+ return
+}
+
+type inode interface {
+ Parent() inode
+ Read([]byte, filenodePtr) (int, filenodePtr, error)
+ Write([]byte, filenodePtr) (int, filenodePtr, error)
+ Truncate(int64) error
+ Readdir() []os.FileInfo
+ Size() int64
+ Stat() os.FileInfo
+ sync.Locker
+ RLock()
+ RUnlock()
+}
+
+// filenode implements inode.
+type filenode struct {
+ fileinfo fileinfo
+ parent *dirnode
+ segments []segment
+ // number of times `segments` has changed in a
+ // way that might invalidate a filenodePtr
+ repacked int64
+ memsize int64 // bytes in memSegments
+ sync.RWMutex
+}
+
+// filenodePtr is an offset into a file that is (usually) efficient to
+// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
+// then
+// filenode.segments[filenodePtr.segmentIdx][filenodePtr.segmentOff]
+// corresponds to file offset filenodePtr.off. Otherwise, it is
+// necessary to reexamine len(filenode.segments[0]) etc. to find the
+// correct segment and offset.
+type filenodePtr struct {
+ off int64
+ segmentIdx int
+ segmentOff int
+ repacked int64
+}
+
+// seek returns a ptr that is consistent with both startPtr.off and
+// the current state of fn. The caller must already hold fn.RLock() or
+// fn.Lock().
+//
+// If startPtr is beyond EOF, ptr.segment* will indicate precisely
+// EOF.
+//
+// After seeking:
+//
+// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+// ||
+// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
+ ptr = startPtr
+ if ptr.off < 0 {
+ // meaningless anyway
+ return
+ } else if ptr.off >= fn.fileinfo.size {
+ ptr.segmentIdx = len(fn.segments)
+ ptr.segmentOff = 0
+ ptr.repacked = fn.repacked
+ return
+ } else if ptr.repacked == fn.repacked {
+ // segmentIdx and segmentOff accurately reflect
+ // ptr.off, but might have fallen off the end of a
+ // segment
+ if ptr.segmentOff >= fn.segments[ptr.segmentIdx].Len() {
+ ptr.segmentIdx++
+ ptr.segmentOff = 0
+ }
+ return
}
- cd.dirents = cd.dirents[count:]
- return ret[:count], err
+ defer func() {
+ ptr.repacked = fn.repacked
+ }()
+ if ptr.off >= fn.fileinfo.size {
+ ptr.segmentIdx, ptr.segmentOff = len(fn.segments), 0
+ return
+ }
+ // Recompute segmentIdx and segmentOff. We have already
+ // established fn.fileinfo.size > ptr.off >= 0, so we don't
+ // have to deal with edge cases here.
+ var off int64
+ for ptr.segmentIdx, ptr.segmentOff = 0, 0; off < ptr.off; ptr.segmentIdx++ {
+ // This would panic (index out of range) if
+ // fn.fileinfo.size were larger than
+ // sum(fn.segments[i].Len()) -- but that can't happen
+ // because we have ensured fn.fileinfo.size is always
+ // accurate.
+ segLen := int64(fn.segments[ptr.segmentIdx].Len())
+ if off+segLen > ptr.off {
+ ptr.segmentOff = int(ptr.off - off)
+ break
+ }
+ off += segLen
+ }
+ return
}
-// Stat implements os.File.
-func (cd *collectionDir) Stat() (os.FileInfo, error) {
- return collectionDirent{
- collection: cd.collection,
- name: path.Base(cd.stream),
- isDir: true,
- size: int64(len(cd.dirents)),
- }, nil
+// caller must have lock
+func (fn *filenode) appendSegment(e segment) {
+ fn.segments = append(fn.segments, e)
+ fn.fileinfo.size += int64(e.Len())
+}
+
+func (fn *filenode) Parent() inode {
+ fn.RLock()
+ defer fn.RUnlock()
+ return fn.parent
}
-// Close implements os.File.
-func (cd *collectionDir) Close() error {
+func (fn *filenode) Readdir() []os.FileInfo {
return nil
}
-// Read implements os.File.
-func (cd *collectionDir) Read([]byte) (int, error) {
- return 0, nil
+// Read reads file data from a single segment, starting at startPtr,
+// into p. startPtr is assumed not to be up-to-date. Caller must have
+// RLock or Lock.
+func (fn *filenode) Read(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+ ptr = fn.seek(startPtr)
+ if ptr.off < 0 {
+ err = ErrNegativeOffset
+ return
+ }
+ if ptr.segmentIdx >= len(fn.segments) {
+ err = io.EOF
+ return
+ }
+ n, err = fn.segments[ptr.segmentIdx].ReadAt(p, int64(ptr.segmentOff))
+ if n > 0 {
+ ptr.off += int64(n)
+ ptr.segmentOff += n
+ if ptr.segmentOff == fn.segments[ptr.segmentIdx].Len() {
+ ptr.segmentIdx++
+ ptr.segmentOff = 0
+ if ptr.segmentIdx < len(fn.segments) && err == io.EOF {
+ err = nil
+ }
+ }
+ }
+ return
}
-// Seek implements os.File.
-func (cd *collectionDir) Seek(int64, int) (int64, error) {
- return 0, nil
+func (fn *filenode) Size() int64 {
+ fn.RLock()
+ defer fn.RUnlock()
+ return fn.fileinfo.Size()
}
-// collectionDirent implements os.FileInfo.
-type collectionDirent struct {
- collection *Collection
- name string
- isDir bool
- mode os.FileMode
- size int64
+func (fn *filenode) Stat() os.FileInfo {
+ fn.RLock()
+ defer fn.RUnlock()
+ return fn.fileinfo
}
-// Name implements os.FileInfo.
-func (e collectionDirent) Name() string {
- return e.name
+func (fn *filenode) Truncate(size int64) error {
+ fn.Lock()
+ defer fn.Unlock()
+ return fn.truncate(size)
}
-// ModTime implements os.FileInfo.
-func (e collectionDirent) ModTime() time.Time {
- if e.collection.ModifiedAt == nil {
- return time.Now()
+func (fn *filenode) truncate(size int64) error {
+ if size == fn.fileinfo.size {
+ return nil
+ }
+ fn.repacked++
+ if size < fn.fileinfo.size {
+ ptr := fn.seek(filenodePtr{off: size})
+ for i := ptr.segmentIdx; i < len(fn.segments); i++ {
+ if seg, ok := fn.segments[i].(*memSegment); ok {
+ fn.memsize -= int64(seg.Len())
+ }
+ }
+ if ptr.segmentOff == 0 {
+ fn.segments = fn.segments[:ptr.segmentIdx]
+ } else {
+ fn.segments = fn.segments[:ptr.segmentIdx+1]
+ switch seg := fn.segments[ptr.segmentIdx].(type) {
+ case *memSegment:
+ seg.Truncate(ptr.segmentOff)
+ fn.memsize += int64(seg.Len())
+ default:
+ fn.segments[ptr.segmentIdx] = seg.Slice(0, ptr.segmentOff)
+ }
+ }
+ fn.fileinfo.size = size
+ return nil
}
- return *e.collection.ModifiedAt
+ for size > fn.fileinfo.size {
+ grow := size - fn.fileinfo.size
+ var seg *memSegment
+ var ok bool
+ if len(fn.segments) == 0 {
+ seg = &memSegment{}
+ fn.segments = append(fn.segments, seg)
+ } else if seg, ok = fn.segments[len(fn.segments)-1].(*memSegment); !ok || seg.Len() >= maxBlockSize {
+ seg = &memSegment{}
+ fn.segments = append(fn.segments, seg)
+ }
+ if maxgrow := int64(maxBlockSize - seg.Len()); maxgrow < grow {
+ grow = maxgrow
+ }
+ seg.Truncate(seg.Len() + int(grow))
+ fn.fileinfo.size += grow
+ fn.memsize += grow
+ }
+ return nil
}
-// Mode implements os.FileInfo.
-func (e collectionDirent) Mode() os.FileMode {
- if e.isDir {
- return 0555
+// Write writes data from p to the file, starting at startPtr,
+// extending the file size if necessary. Caller must have Lock.
+func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePtr, err error) {
+ if startPtr.off > fn.fileinfo.size {
+ if err = fn.truncate(startPtr.off); err != nil {
+ return 0, startPtr, err
+ }
+ }
+ ptr = fn.seek(startPtr)
+ if ptr.off < 0 {
+ err = ErrNegativeOffset
+ return
+ }
+ for len(p) > 0 && err == nil {
+ cando := p
+ if len(cando) > maxBlockSize {
+ cando = cando[:maxBlockSize]
+ }
+ // Rearrange/grow fn.segments (and shrink cando if
+ // needed) such that cando can be copied to
+ // fn.segments[ptr.segmentIdx] at offset
+ // ptr.segmentOff.
+ cur := ptr.segmentIdx
+ prev := ptr.segmentIdx - 1
+ var curWritable bool
+ if cur < len(fn.segments) {
+ _, curWritable = fn.segments[cur].(*memSegment)
+ }
+ var prevAppendable bool
+ if prev >= 0 && fn.segments[prev].Len() < maxBlockSize {
+ _, prevAppendable = fn.segments[prev].(*memSegment)
+ }
+ if ptr.segmentOff > 0 && !curWritable {
+ // Split a non-writable block.
+ if max := fn.segments[cur].Len() - ptr.segmentOff; max <= len(cando) {
+ // Truncate cur, and insert a new
+ // segment after it.
+ cando = cando[:max]
+ fn.segments = append(fn.segments, nil)
+ copy(fn.segments[cur+1:], fn.segments[cur:])
+ } else {
+ // Split cur into two copies, truncate
+ // the one on the left, shift the one
+ // on the right, and insert a new
+ // segment between them.
+ fn.segments = append(fn.segments, nil, nil)
+ copy(fn.segments[cur+2:], fn.segments[cur:])
+ fn.segments[cur+2] = fn.segments[cur+2].Slice(ptr.segmentOff+len(cando), -1)
+ }
+ cur++
+ prev++
+ seg := &memSegment{}
+ seg.Truncate(len(cando))
+ fn.memsize += int64(len(cando))
+ fn.segments[cur] = seg
+ fn.segments[prev] = fn.segments[prev].Slice(0, ptr.segmentOff)
+ ptr.segmentIdx++
+ ptr.segmentOff = 0
+ fn.repacked++
+ ptr.repacked++
+ } else if curWritable {
+ if fit := int(fn.segments[cur].Len()) - ptr.segmentOff; fit < len(cando) {
+ cando = cando[:fit]
+ }
+ } else {
+ if prevAppendable {
+ // Shrink cando if needed to fit in
+ // prev segment.
+ if cangrow := maxBlockSize - fn.segments[prev].Len(); cangrow < len(cando) {
+ cando = cando[:cangrow]
+ }
+ }
+
+ if cur == len(fn.segments) {
+ // ptr is at EOF, filesize is changing.
+ fn.fileinfo.size += int64(len(cando))
+ } else if el := fn.segments[cur].Len(); el <= len(cando) {
+ // cando is long enough that we won't
+ // need cur any more. shrink cando to
+ // be exactly as long as cur
+ // (otherwise we'd accidentally shift
+ // the effective position of all
+ // segments after cur).
+ cando = cando[:el]
+ copy(fn.segments[cur:], fn.segments[cur+1:])
+ fn.segments = fn.segments[:len(fn.segments)-1]
+ } else {
+ // shrink cur by the same #bytes we're growing prev
+ fn.segments[cur] = fn.segments[cur].Slice(len(cando), -1)
+ }
+
+ if prevAppendable {
+ // Grow prev.
+ ptr.segmentIdx--
+ ptr.segmentOff = fn.segments[prev].Len()
+ fn.segments[prev].(*memSegment).Truncate(ptr.segmentOff + len(cando))
+ fn.memsize += int64(len(cando))
+ ptr.repacked++
+ fn.repacked++
+ } else {
+ // Insert a segment between prev and
+ // cur, and advance prev/cur.
+ fn.segments = append(fn.segments, nil)
+ if cur < len(fn.segments) {
+ copy(fn.segments[cur+1:], fn.segments[cur:])
+ ptr.repacked++
+ fn.repacked++
+ } else {
+ // appending a new segment does
+ // not invalidate any ptrs
+ }
+ seg := &memSegment{}
+ seg.Truncate(len(cando))
+ fn.memsize += int64(len(cando))
+ fn.segments[cur] = seg
+ cur++
+ prev++
+ }
+ }
+
+ // Finally we can copy bytes from cando to the current segment.
+ fn.segments[ptr.segmentIdx].(*memSegment).WriteAt(cando, ptr.segmentOff)
+ n += len(cando)
+ p = p[len(cando):]
+
+ ptr.off += int64(len(cando))
+ ptr.segmentOff += len(cando)
+ if ptr.segmentOff >= maxBlockSize {
+ fn.pruneMemSegments()
+ }
+ if fn.segments[ptr.segmentIdx].Len() == ptr.segmentOff {
+ ptr.segmentOff = 0
+ ptr.segmentIdx++
+ }
+
+ fn.fileinfo.modTime = time.Now()
+ }
+ return
+}
+
+// Write some data out to disk to reduce memory use. Caller must have
+// write lock.
+func (fn *filenode) pruneMemSegments() {
+ // TODO: async (don't hold Lock() while waiting for Keep)
+ // TODO: share code with (*dirnode)sync()
+ // TODO: pack/flush small blocks too, when fragmented
+ for idx, seg := range fn.segments {
+ seg, ok := seg.(*memSegment)
+ if !ok || seg.Len() < maxBlockSize {
+ continue
+ }
+ locator, _, err := fn.parent.kc.PutB(seg.buf)
+ if err != nil {
+ // TODO: stall (or return errors from)
+ // subsequent writes until flushing
+ // starts to succeed
+ continue
+ }
+ fn.memsize -= int64(seg.Len())
+ fn.segments[idx] = storedSegment{
+ kc: fn.parent.kc,
+ locator: locator,
+ size: seg.Len(),
+ offset: 0,
+ length: seg.Len(),
+ }
+ }
+}
+
+// FileSystem returns a CollectionFileSystem for the collection.
+func (c *Collection) FileSystem(client *Client, kc keepClient) (CollectionFileSystem, error) {
+ var modTime time.Time
+ if c.ModifiedAt == nil {
+ modTime = time.Now()
} else {
- return 0444
+ modTime = *c.ModifiedAt
+ }
+ fs := &fileSystem{dirnode: dirnode{
+ client: client,
+ kc: kc,
+ fileinfo: fileinfo{
+ name: ".",
+ mode: os.ModeDir | 0755,
+ modTime: modTime,
+ },
+ parent: nil,
+ inodes: make(map[string]inode),
+ }}
+ fs.dirnode.parent = &fs.dirnode
+ if err := fs.dirnode.loadManifest(c.ManifestText); err != nil {
+ return nil, err
}
+ return fs, nil
}
-// IsDir implements os.FileInfo.
-func (e collectionDirent) IsDir() bool {
- return e.isDir
+type filehandle struct {
+ inode
+ ptr filenodePtr
+ append bool
+ readable bool
+ writable bool
+ unreaddirs []os.FileInfo
}
-// Size implements os.FileInfo.
-func (e collectionDirent) Size() int64 {
- return e.size
+func (f *filehandle) Read(p []byte) (n int, err error) {
+ if !f.readable {
+ return 0, ErrWriteOnlyMode
+ }
+ f.inode.RLock()
+ defer f.inode.RUnlock()
+ n, f.ptr, err = f.inode.Read(p, f.ptr)
+ return
}
-// Sys implements os.FileInfo.
-func (e collectionDirent) Sys() interface{} {
- return nil
+func (f *filehandle) Seek(off int64, whence int) (pos int64, err error) {
+ size := f.inode.Size()
+ ptr := f.ptr
+ switch whence {
+ case io.SeekStart:
+ ptr.off = off
+ case io.SeekCurrent:
+ ptr.off += off
+ case io.SeekEnd:
+ ptr.off = size + off
+ }
+ if ptr.off < 0 {
+ return f.ptr.off, ErrNegativeOffset
+ }
+ if ptr.off != f.ptr.off {
+ f.ptr = ptr
+ // force filenode to recompute f.ptr fields on next
+ // use
+ f.ptr.repacked = -1
+ }
+ return f.ptr.off, nil
}
-// A CollectionFileSystem is an http.Filesystem with an added Stat() method.
-type CollectionFileSystem interface {
- http.FileSystem
- Stat(name string) (os.FileInfo, error)
+func (f *filehandle) Truncate(size int64) error {
+ return f.inode.Truncate(size)
}
-// collectionFS implements CollectionFileSystem.
-type collectionFS struct {
- collection *Collection
- client *Client
- kc keepClient
- sizes map[string]int64
- sizesOnce sync.Once
+func (f *filehandle) Write(p []byte) (n int, err error) {
+ if !f.writable {
+ return 0, ErrReadOnlyFile
+ }
+ f.inode.Lock()
+ defer f.inode.Unlock()
+ if fn, ok := f.inode.(*filenode); ok && f.append {
+ f.ptr = filenodePtr{
+ off: fn.fileinfo.size,
+ segmentIdx: len(fn.segments),
+ segmentOff: 0,
+ repacked: fn.repacked,
+ }
+ }
+ n, f.ptr, err = f.inode.Write(p, f.ptr)
+ return
}
-// FileSystem returns a CollectionFileSystem for the collection.
-func (c *Collection) FileSystem(client *Client, kc keepClient) CollectionFileSystem {
- return &collectionFS{
- collection: c,
- client: client,
- kc: kc,
- }
-}
-
-func (c *collectionFS) Stat(name string) (os.FileInfo, error) {
- name = canonicalName(name)
- if name == "." {
- return collectionDirent{
- collection: c.collection,
- name: "/",
- isDir: true,
- }, nil
- }
- if size, ok := c.fileSizes()[name]; ok {
- return collectionDirent{
- collection: c.collection,
- name: path.Base(name),
- size: size,
- isDir: false,
- }, nil
- }
- for fnm := range c.fileSizes() {
- if !strings.HasPrefix(fnm, name+"/") {
+func (f *filehandle) Readdir(count int) ([]os.FileInfo, error) {
+ if !f.inode.Stat().IsDir() {
+ return nil, ErrInvalidOperation
+ }
+ if count <= 0 {
+ return f.inode.Readdir(), nil
+ }
+ if f.unreaddirs == nil {
+ f.unreaddirs = f.inode.Readdir()
+ }
+ if len(f.unreaddirs) == 0 {
+ return nil, io.EOF
+ }
+ if count > len(f.unreaddirs) {
+ count = len(f.unreaddirs)
+ }
+ ret := f.unreaddirs[:count]
+ f.unreaddirs = f.unreaddirs[count:]
+ return ret, nil
+}
+
+func (f *filehandle) Stat() (os.FileInfo, error) {
+ return f.inode.Stat(), nil
+}
+
+func (f *filehandle) Close() error {
+ return nil
+}
+
+type dirnode struct {
+ fileinfo fileinfo
+ parent *dirnode
+ client *Client
+ kc keepClient
+ inodes map[string]inode
+ sync.RWMutex
+}
+
+// sync flushes in-memory data (for all files in the tree rooted at
+// dn) to persistent storage. Caller must hold dn.Lock().
+func (dn *dirnode) sync() error {
+ type shortBlock struct {
+ fn *filenode
+ idx int
+ }
+ var pending []shortBlock
+ var pendingLen int
+
+ flush := func(sbs []shortBlock) error {
+ if len(sbs) == 0 {
+ return nil
+ }
+ block := make([]byte, 0, maxBlockSize)
+ for _, sb := range sbs {
+ block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
+ }
+ locator, _, err := dn.kc.PutB(block)
+ if err != nil {
+ return err
+ }
+ off := 0
+ for _, sb := range sbs {
+ data := sb.fn.segments[sb.idx].(*memSegment).buf
+ sb.fn.segments[sb.idx] = storedSegment{
+ kc: dn.kc,
+ locator: locator,
+ size: len(block),
+ offset: off,
+ length: len(data),
+ }
+ off += len(data)
+ sb.fn.memsize -= int64(len(data))
+ }
+ return nil
+ }
+
+ names := make([]string, 0, len(dn.inodes))
+ for name := range dn.inodes {
+ names = append(names, name)
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ fn, ok := dn.inodes[name].(*filenode)
+ if !ok {
continue
}
- return collectionDirent{
- collection: c.collection,
- name: path.Base(name),
- isDir: true,
- }, nil
+ fn.Lock()
+ defer fn.Unlock()
+ for idx, seg := range fn.segments {
+ seg, ok := seg.(*memSegment)
+ if !ok {
+ continue
+ }
+ if seg.Len() > maxBlockSize/2 {
+ if err := flush([]shortBlock{{fn, idx}}); err != nil {
+ return err
+ }
+ continue
+ }
+ if pendingLen+seg.Len() > maxBlockSize {
+ if err := flush(pending); err != nil {
+ return err
+ }
+ pending = nil
+ pendingLen = 0
+ }
+ pending = append(pending, shortBlock{fn, idx})
+ pendingLen += seg.Len()
+ }
}
- return nil, os.ErrNotExist
+ return flush(pending)
}
-func (c *collectionFS) Open(name string) (http.File, error) {
- // Ensure name looks the way it does in a manifest.
- name = canonicalName(name)
+func (dn *dirnode) MarshalManifest(prefix string) (string, error) {
+ dn.Lock()
+ defer dn.Unlock()
+ return dn.marshalManifest(prefix)
+}
- m := manifest.Manifest{Text: c.collection.ManifestText}
+// caller must have read lock.
+func (dn *dirnode) marshalManifest(prefix string) (string, error) {
+ var streamLen int64
+ type filepart struct {
+ name string
+ offset int64
+ length int64
+ }
+ var fileparts []filepart
+ var subdirs string
+ var blocks []string
- // Return a file if it exists.
- if size, ok := c.fileSizes()[name]; ok {
- reader, err := c.kc.ManifestFileReader(m, name)
- if err != nil {
- return nil, err
+ if err := dn.sync(); err != nil {
+ return "", err
+ }
+
+ names := make([]string, 0, len(dn.inodes))
+ for name, node := range dn.inodes {
+ names = append(names, name)
+ node.Lock()
+ defer node.Unlock()
+ }
+ sort.Strings(names)
+
+ for _, name := range names {
+ switch node := dn.inodes[name].(type) {
+ case *dirnode:
+ subdir, err := node.marshalManifest(prefix + "/" + name)
+ if err != nil {
+ return "", err
+ }
+ subdirs = subdirs + subdir
+ case *filenode:
+ if len(node.segments) == 0 {
+ fileparts = append(fileparts, filepart{name: name})
+ break
+ }
+ for _, seg := range node.segments {
+ switch seg := seg.(type) {
+ case storedSegment:
+ if len(blocks) > 0 && blocks[len(blocks)-1] == seg.locator {
+ streamLen -= int64(seg.size)
+ } else {
+ blocks = append(blocks, seg.locator)
+ }
+ next := filepart{
+ name: name,
+ offset: streamLen + int64(seg.offset),
+ length: int64(seg.length),
+ }
+ if prev := len(fileparts) - 1; prev >= 0 &&
+ fileparts[prev].name == name &&
+ fileparts[prev].offset+fileparts[prev].length == next.offset {
+ fileparts[prev].length += next.length
+ } else {
+ fileparts = append(fileparts, next)
+ }
+ streamLen += int64(seg.size)
+ default:
+ // This can't happen: we
+ // haven't unlocked since
+ // calling sync().
+ panic(fmt.Sprintf("can't marshal segment type %T", seg))
+ }
+ }
+ default:
+ panic(fmt.Sprintf("can't marshal inode type %T", node))
+ }
+ }
+ var filetokens []string
+ for _, s := range fileparts {
+ filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+ }
+ if len(filetokens) == 0 {
+ return subdirs, nil
+ } else if len(blocks) == 0 {
+ blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+ }
+ return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+}
+
+func (dn *dirnode) loadManifest(txt string) error {
+ var dirname string
+ streams := strings.Split(txt, "\n")
+ if streams[len(streams)-1] != "" {
+ return fmt.Errorf("line %d: no trailing newline", len(streams))
+ }
+ streams = streams[:len(streams)-1]
+ segments := []storedSegment{}
+ for i, stream := range streams {
+ lineno := i + 1
+ var anyFileTokens bool
+ var pos int64
+ var segIdx int
+ segments = segments[:0]
+ for i, token := range strings.Split(stream, " ") {
+ if i == 0 {
+ dirname = manifestUnescape(token)
+ continue
+ }
+ if !strings.Contains(token, ":") {
+ if anyFileTokens {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ toks := strings.SplitN(token, "+", 3)
+ if len(toks) < 2 {
+ return fmt.Errorf("line %d: bad locator %q", lineno, token)
+ }
+ length, err := strconv.ParseInt(toks[1], 10, 32)
+ if err != nil || length < 0 {
+ return fmt.Errorf("line %d: bad locator %q", lineno, token)
+ }
+ segments = append(segments, storedSegment{
+ locator: token,
+ size: int(length),
+ offset: 0,
+ length: int(length),
+ })
+ continue
+ } else if len(segments) == 0 {
+ return fmt.Errorf("line %d: bad locator %q", lineno, token)
+ }
+
+ toks := strings.Split(token, ":")
+ if len(toks) != 3 {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ anyFileTokens = true
+
+ offset, err := strconv.ParseInt(toks[0], 10, 64)
+ if err != nil || offset < 0 {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ length, err := strconv.ParseInt(toks[1], 10, 64)
+ if err != nil || length < 0 {
+ return fmt.Errorf("line %d: bad file segment %q", lineno, token)
+ }
+ name := dirname + "/" + manifestUnescape(toks[2])
+ fnode, err := dn.createFileAndParents(name)
+ if err != nil {
+ return fmt.Errorf("line %d: cannot use path %q: %s", lineno, name, err)
+ }
+ // Map the stream offset/range coordinates to
+ // block/offset/range coordinates and add
+ // corresponding storedSegments to the filenode
+ if pos > offset {
+ // Can't continue where we left off.
+ // TODO: binary search instead of
+ // rewinding all the way (but this
+ // situation might be rare anyway)
+ segIdx, pos = 0, 0
+ }
+ for next := int64(0); segIdx < len(segments); segIdx++ {
+ seg := segments[segIdx]
+ next = pos + int64(seg.Len())
+ if next <= offset || seg.Len() == 0 {
+ pos = next
+ continue
+ }
+ if pos >= offset+length {
+ break
+ }
+ var blkOff int
+ if pos < offset {
+ blkOff = int(offset - pos)
+ }
+ blkLen := seg.Len() - blkOff
+ if pos+int64(blkOff+blkLen) > offset+length {
+ blkLen = int(offset + length - pos - int64(blkOff))
+ }
+ fnode.appendSegment(storedSegment{
+ kc: dn.kc,
+ locator: seg.locator,
+ size: seg.size,
+ offset: blkOff,
+ length: blkLen,
+ })
+ if next > offset+length {
+ break
+ } else {
+ pos = next
+ }
+ }
+ if segIdx == len(segments) && pos < offset+length {
+ return fmt.Errorf("line %d: invalid segment in %d-byte stream: %q", lineno, pos, token)
+ }
+ }
+ if !anyFileTokens {
+ return fmt.Errorf("line %d: no file segments", lineno)
+ } else if len(segments) == 0 {
+ return fmt.Errorf("line %d: no locators", lineno)
+ } else if dirname == "" {
+ return fmt.Errorf("line %d: no stream name", lineno)
+ }
+ }
+ return nil
+}
+
+// only safe to call from loadManifest -- no locking
+func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+ names := strings.Split(path, "/")
+ basename := names[len(names)-1]
+ if basename == "" || basename == "." || basename == ".." {
+ err = fmt.Errorf("invalid filename")
+ return
+ }
+ for _, name := range names[:len(names)-1] {
+ switch name {
+ case "", ".":
+ case "..":
+ dn = dn.parent
+ default:
+ switch node := dn.inodes[name].(type) {
+ case nil:
+ dn = dn.newDirnode(name, 0755, dn.fileinfo.modTime)
+ case *dirnode:
+ dn = node
+ case *filenode:
+ err = ErrFileExists
+ return
+ }
+ }
+ }
+ switch node := dn.inodes[basename].(type) {
+ case nil:
+ fn = dn.newFilenode(basename, 0755, dn.fileinfo.modTime)
+ case *filenode:
+ fn = node
+ case *dirnode:
+ err = ErrIsDirectory
+ }
+ return
+}
+
+func (dn *dirnode) mkdir(name string) (*filehandle, error) {
+ return dn.OpenFile(name, os.O_CREATE|os.O_EXCL, os.ModeDir|0755)
+}
+
+func (dn *dirnode) Mkdir(name string, perm os.FileMode) error {
+ f, err := dn.mkdir(name)
+ if err == nil {
+ err = f.Close()
+ }
+ return err
+}
+
+func (dn *dirnode) Remove(name string) error {
+ return dn.remove(strings.TrimRight(name, "/"), false)
+}
+
+func (dn *dirnode) RemoveAll(name string) error {
+ err := dn.remove(strings.TrimRight(name, "/"), true)
+ if os.IsNotExist(err) {
+ // "If the path does not exist, RemoveAll returns
+ // nil." (see "os" pkg)
+ err = nil
+ }
+ return err
+}
+
+func (dn *dirnode) remove(name string, recursive bool) error {
+ dirname, name := path.Split(name)
+ if name == "" || name == "." || name == ".." {
+ return ErrInvalidArgument
+ }
+ dn, ok := dn.lookupPath(dirname).(*dirnode)
+ if !ok {
+ return os.ErrNotExist
+ }
+ dn.Lock()
+ defer dn.Unlock()
+ switch node := dn.inodes[name].(type) {
+ case nil:
+ return os.ErrNotExist
+ case *dirnode:
+ node.RLock()
+ defer node.RUnlock()
+ if !recursive && len(node.inodes) > 0 {
+ return ErrDirectoryNotEmpty
}
- return &collectionFile{
- File: reader,
- collection: c.collection,
- name: path.Base(name),
- size: size,
- }, nil
}
+ delete(dn.inodes, name)
+ return nil
+}
+
+func (dn *dirnode) Rename(oldname, newname string) error {
+ olddir, oldname := path.Split(oldname)
+ if oldname == "" || oldname == "." || oldname == ".." {
+ return ErrInvalidArgument
+ }
+ olddirf, err := dn.OpenFile(olddir+".", os.O_RDONLY, 0)
+ if err != nil {
+ return fmt.Errorf("%q: %s", olddir, err)
+ }
+ defer olddirf.Close()
+ newdir, newname := path.Split(newname)
+ if newname == "." || newname == ".." {
+ return ErrInvalidArgument
+ } else if newname == "" {
+ // Rename("a/b", "c/") means Rename("a/b", "c/b")
+ newname = oldname
+ }
+ newdirf, err := dn.OpenFile(newdir+".", os.O_RDONLY, 0)
+ if err != nil {
+ return fmt.Errorf("%q: %s", newdir, err)
+ }
+ defer newdirf.Close()
+
+ // When acquiring locks on multiple nodes, all common
+ // ancestors must be locked first in order to avoid
+ // deadlock. This is assured by locking the path from root to
+ // newdir, then locking the path from root to olddir, skipping
+ // any already-locked nodes.
+ needLock := []sync.Locker{}
+ for _, f := range []*filehandle{olddirf, newdirf} {
+ node := f.inode
+ needLock = append(needLock, node)
+ for node.Parent() != node {
+ node = node.Parent()
+ needLock = append(needLock, node)
+ }
+ }
+ locked := map[sync.Locker]bool{}
+ for i := len(needLock) - 1; i >= 0; i-- {
+ if n := needLock[i]; !locked[n] {
+ n.Lock()
+ defer n.Unlock()
+ locked[n] = true
+ }
+ }
+
+ olddn := olddirf.inode.(*dirnode)
+ newdn := newdirf.inode.(*dirnode)
+ oldinode, ok := olddn.inodes[oldname]
+ if !ok {
+ return os.ErrNotExist
+ }
+ if existing, ok := newdn.inodes[newname]; ok {
+ // overwriting an existing file or dir
+ if dn, ok := existing.(*dirnode); ok {
+ if !oldinode.Stat().IsDir() {
+ return ErrIsDirectory
+ }
+ dn.RLock()
+ defer dn.RUnlock()
+ if len(dn.inodes) > 0 {
+ return ErrDirectoryNotEmpty
+ }
+ }
+ } else {
+ if newdn.inodes == nil {
+ newdn.inodes = make(map[string]inode)
+ }
+ newdn.fileinfo.size++
+ }
+ newdn.inodes[newname] = oldinode
+ switch n := oldinode.(type) {
+ case *dirnode:
+ n.parent = newdn
+ case *filenode:
+ n.parent = newdn
+ default:
+ panic(fmt.Sprintf("bad inode type %T", n))
+ }
+ delete(olddn.inodes, oldname)
+ olddn.fileinfo.size--
+ return nil
+}
+
+func (dn *dirnode) Parent() inode {
+ dn.RLock()
+ defer dn.RUnlock()
+ return dn.parent
+}
+
+func (dn *dirnode) Readdir() (fi []os.FileInfo) {
+ dn.RLock()
+ defer dn.RUnlock()
+ fi = make([]os.FileInfo, 0, len(dn.inodes))
+ for _, inode := range dn.inodes {
+ fi = append(fi, inode.Stat())
+ }
+ return
+}
- // Return a directory if it's the root dir or there are file
- // entries below it.
- children := map[string]collectionDirent{}
- for fnm, size := range c.fileSizes() {
- if !strings.HasPrefix(fnm, name+"/") {
+func (dn *dirnode) Read(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Write(p []byte, ptr filenodePtr) (int, filenodePtr, error) {
+ return 0, ptr, ErrInvalidOperation
+}
+
+func (dn *dirnode) Size() int64 {
+ dn.RLock()
+ defer dn.RUnlock()
+ return dn.fileinfo.Size()
+}
+
+func (dn *dirnode) Stat() os.FileInfo {
+ dn.RLock()
+ defer dn.RUnlock()
+ return dn.fileinfo
+}
+
+func (dn *dirnode) Truncate(int64) error {
+ return ErrInvalidOperation
+}
+
+// lookupPath returns the inode for the file/directory with the given
+// name (which may contain "/" separators), along with its parent
+// node. If no such file/directory exists, the returned node is nil.
+func (dn *dirnode) lookupPath(path string) (node inode) {
+ node = dn
+ for _, name := range strings.Split(path, "/") {
+ dn, ok := node.(*dirnode)
+ if !ok {
+ return nil
+ }
+ if name == "." || name == "" {
continue
}
- isDir := false
- ent := fnm[len(name)+1:]
- if i := strings.Index(ent, "/"); i >= 0 {
- ent = ent[:i]
- isDir = true
+ if name == ".." {
+ node = node.Parent()
+ continue
}
- e := children[ent]
- e.collection = c.collection
- e.isDir = isDir
- e.name = ent
- e.size = size
- children[ent] = e
+ dn.RLock()
+ node = dn.inodes[name]
+ dn.RUnlock()
+ }
+ return
+}
+
+func (dn *dirnode) newDirnode(name string, perm os.FileMode, modTime time.Time) *dirnode {
+ child := &dirnode{
+ parent: dn,
+ client: dn.client,
+ kc: dn.kc,
+ fileinfo: fileinfo{
+ name: name,
+ mode: os.ModeDir | perm,
+ modTime: modTime,
+ },
+ }
+ if dn.inodes == nil {
+ dn.inodes = make(map[string]inode)
+ }
+ dn.inodes[name] = child
+ dn.fileinfo.size++
+ return child
+}
+
+func (dn *dirnode) newFilenode(name string, perm os.FileMode, modTime time.Time) *filenode {
+ child := &filenode{
+ parent: dn,
+ fileinfo: fileinfo{
+ name: name,
+ mode: perm,
+ modTime: modTime,
+ },
+ }
+ if dn.inodes == nil {
+ dn.inodes = make(map[string]inode)
}
- if len(children) == 0 && name != "." {
+ dn.inodes[name] = child
+ dn.fileinfo.size++
+ return child
+}
+
+// OpenFile is analogous to os.OpenFile().
+func (dn *dirnode) OpenFile(name string, flag int, perm os.FileMode) (*filehandle, error) {
+ if flag&os.O_SYNC != 0 {
+ return nil, ErrSyncNotSupported
+ }
+ dirname, name := path.Split(name)
+ dn, ok := dn.lookupPath(dirname).(*dirnode)
+ if !ok {
return nil, os.ErrNotExist
}
- dirents := make([]os.FileInfo, 0, len(children))
- for _, ent := range children {
- dirents = append(dirents, ent)
+ var readable, writable bool
+ switch flag & (os.O_RDWR | os.O_RDONLY | os.O_WRONLY) {
+ case os.O_RDWR:
+ readable = true
+ writable = true
+ case os.O_RDONLY:
+ readable = true
+ case os.O_WRONLY:
+ writable = true
+ default:
+ return nil, fmt.Errorf("invalid flags 0x%x", flag)
+ }
+ if !writable {
+ // A directory can be opened via "foo/", "foo/.", or
+ // "foo/..".
+ switch name {
+ case ".", "":
+ return &filehandle{inode: dn}, nil
+ case "..":
+ return &filehandle{inode: dn.Parent()}, nil
+ }
+ }
+ createMode := flag&os.O_CREATE != 0
+ if createMode {
+ dn.Lock()
+ defer dn.Unlock()
+ } else {
+ dn.RLock()
+ defer dn.RUnlock()
}
- return &collectionDir{
- collection: c.collection,
- stream: name,
- dirents: dirents,
+ n, ok := dn.inodes[name]
+ if !ok {
+ if !createMode {
+ return nil, os.ErrNotExist
+ }
+ if perm.IsDir() {
+ n = dn.newDirnode(name, 0755, time.Now())
+ } else {
+ n = dn.newFilenode(name, 0755, time.Now())
+ }
+ } else if flag&os.O_EXCL != 0 {
+ return nil, ErrFileExists
+ } else if flag&os.O_TRUNC != 0 {
+ if !writable {
+ return nil, fmt.Errorf("invalid flag O_TRUNC in read-only mode")
+ } else if fn, ok := n.(*filenode); !ok {
+ return nil, fmt.Errorf("invalid flag O_TRUNC when opening directory")
+ } else {
+ fn.Truncate(0)
+ }
+ }
+ return &filehandle{
+ inode: n,
+ append: flag&os.O_APPEND != 0,
+ readable: readable,
+ writable: writable,
}, nil
}
-// fileSizes returns a map of files that can be opened. Each key
-// starts with "./".
-func (c *collectionFS) fileSizes() map[string]int64 {
- c.sizesOnce.Do(func() {
- c.sizes = map[string]int64{}
- m := manifest.Manifest{Text: c.collection.ManifestText}
- for ms := range m.StreamIter() {
- for _, fss := range ms.FileStreamSegments {
- c.sizes[ms.StreamName+"/"+fss.Name] += int64(fss.SegLen)
- }
+type segment interface {
+ io.ReaderAt
+ Len() int
+ // Return a new segment with a subsection of the data from this
+ // one. length<0 means length=Len()-off.
+ Slice(off int, length int) segment
+}
+
+type memSegment struct {
+ buf []byte
+}
+
+func (me *memSegment) Len() int {
+ return len(me.buf)
+}
+
+func (me *memSegment) Slice(off, length int) segment {
+ if length < 0 {
+ length = len(me.buf) - off
+ }
+ buf := make([]byte, length)
+ copy(buf, me.buf[off:])
+ return &memSegment{buf: buf}
+}
+
+func (me *memSegment) Truncate(n int) {
+ if n > cap(me.buf) {
+ newsize := 1024
+ for newsize < n {
+ newsize = newsize << 2
+ }
+ newbuf := make([]byte, n, newsize)
+ copy(newbuf, me.buf)
+ me.buf = newbuf
+ } else {
+ // Zero unused part when shrinking, in case we grow
+ // and start using it again later.
+ for i := n; i < len(me.buf); i++ {
+ me.buf[i] = 0
+ }
+ }
+ me.buf = me.buf[:n]
+}
+
+func (me *memSegment) WriteAt(p []byte, off int) {
+ if off+len(p) > len(me.buf) {
+ panic("overflowed segment")
+ }
+ copy(me.buf[off:], p)
+}
+
+func (me *memSegment) ReadAt(p []byte, off int64) (n int, err error) {
+ if off > int64(me.Len()) {
+ err = io.EOF
+ return
+ }
+ n = copy(p, me.buf[int(off):])
+ if n < len(p) {
+ err = io.EOF
+ }
+ return
+}
+
+type storedSegment struct {
+ kc keepClient
+ locator string
+ size int // size of stored block (also encoded in locator)
+ offset int // position of segment within the stored block
+ length int // bytes in this segment (offset + length <= size)
+}
+
+func (se storedSegment) Len() int {
+ return se.length
+}
+
+func (se storedSegment) Slice(n, size int) segment {
+ se.offset += n
+ se.length -= n
+ if size >= 0 && se.length > size {
+ se.length = size
+ }
+ return se
+}
+
+func (se storedSegment) ReadAt(p []byte, off int64) (n int, err error) {
+ if off > int64(se.length) {
+ return 0, io.EOF
+ }
+ maxlen := se.length - int(off)
+ if len(p) > maxlen {
+ p = p[:maxlen]
+ n, err = se.kc.ReadAt(se.locator, p, int(off)+se.offset)
+ if err == nil {
+ err = io.EOF
}
- })
- return c.sizes
+ return
+ }
+ return se.kc.ReadAt(se.locator, p, int(off)+se.offset)
}
func canonicalName(name string) string {
}
return name
}
+
+var manifestEscapeSeq = regexp.MustCompile(`\\([0-7]{3}|\\)`)
+
+func manifestUnescapeFunc(seq string) string {
+ if seq == `\\` {
+ return `\`
+ }
+ i, err := strconv.ParseUint(seq[1:], 8, 8)
+ if err != nil {
+ // Invalid escape sequence: can't unescape.
+ return seq
+ }
+ return string([]byte{byte(i)})
+}
+
+func manifestUnescape(s string) string {
+ return manifestEscapeSeq.ReplaceAllStringFunc(s, manifestUnescapeFunc)
+}
+
+var manifestEscapedChar = regexp.MustCompile(`[\000-\040:\s\\]`)
+
+func manifestEscapeFunc(seq string) string {
+ return fmt.Sprintf("\\%03o", byte(seq[0]))
+}
+
+func manifestEscape(s string) string {
+ return manifestEscapedChar.ReplaceAllStringFunc(s, manifestEscapeFunc)
+}
package arvados
import (
+ "bytes"
+ "crypto/md5"
+ "errors"
+ "fmt"
"io"
+ "io/ioutil"
+ "math/rand"
"net/http"
"os"
+ "regexp"
+ "runtime"
+ "sync"
"testing"
+ "time"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
var _ = check.Suite(&CollectionFSSuite{})
+type keepClientStub struct {
+ blocks map[string][]byte
+ sync.RWMutex
+}
+
+var errStub404 = errors.New("404 block not found")
+
+func (kcs *keepClientStub) ReadAt(locator string, p []byte, off int) (int, error) {
+ kcs.RLock()
+ defer kcs.RUnlock()
+ buf := kcs.blocks[locator[:32]]
+ if buf == nil {
+ return 0, errStub404
+ }
+ return copy(p, buf[off:]), nil
+}
+
+func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
+ locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
+ buf := make([]byte, len(p))
+ copy(buf, p)
+ kcs.Lock()
+ defer kcs.Unlock()
+ kcs.blocks[locator[:32]] = buf
+ return locator, 1, nil
+}
+
type CollectionFSSuite struct {
client *Client
coll Collection
- fs http.FileSystem
+ fs CollectionFileSystem
+ kc keepClient
}
func (s *CollectionFSSuite) SetUpTest(c *check.C) {
s.client = NewClientFromEnv()
err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
c.Assert(err, check.IsNil)
- s.fs = s.coll.FileSystem(s.client, nil)
+ s.kc = &keepClientStub{
+ blocks: map[string][]byte{
+ "3858f62230ac3c915f300c664312c63f": []byte("foobar"),
+ }}
+ s.fs, err = s.coll.FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestHttpFileSystemInterface(c *check.C) {
+ _, ok := s.fs.(http.FileSystem)
+ c.Check(ok, check.Equals, true)
}
func (s *CollectionFSSuite) TestReaddirFull(c *check.C) {
}
fis, err = f.Readdir(1)
- c.Check(err, check.Equals, io.EOF)
+ c.Check(err, check.IsNil)
c.Check(len(fis), check.Equals, 1)
if len(fis) > 0 {
c.Check(fis[0].Size(), check.Equals, int64(3))
c.Assert(err, check.IsNil)
fis, err = f.Readdir(2)
c.Check(len(fis), check.Equals, 1)
- c.Assert(err, check.Equals, io.EOF)
+ c.Assert(err, check.IsNil)
fis, err = f.Readdir(2)
c.Check(len(fis), check.Equals, 0)
c.Assert(err, check.Equals, io.EOF)
}
}
-func (s *CollectionFSSuite) TestOpenFile(c *check.C) {
- c.Skip("cannot test files with nil keepclient")
+func (s *CollectionFSSuite) TestReadOnlyFile(c *check.C) {
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ st, err := f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(st.Size(), check.Equals, int64(3))
+ n, err := f.Write([]byte("bar"))
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, ErrReadOnlyFile)
+}
+
+func (s *CollectionFSSuite) TestCreateFile(c *check.C) {
+ f, err := s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ st, err := f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(st.Size(), check.Equals, int64(0))
+
+ n, err := f.Write([]byte("bar"))
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+
+ c.Check(f.Close(), check.IsNil)
+
+ f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+ c.Check(f, check.IsNil)
+ c.Assert(err, check.NotNil)
+
+ f, err = s.fs.OpenFile("/new-file 1", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ st, err = f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(st.Size(), check.Equals, int64(3))
+
+ c.Check(f.Close(), check.IsNil)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
+}
+
+func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
- f, err := s.fs.Open("/foo.txt")
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
c.Assert(err, check.IsNil)
+ defer f.Close()
st, err := f.Stat()
c.Assert(err, check.IsNil)
c.Check(st.Size(), check.Equals, int64(3))
+
+ f2, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f2.Close()
+
+ buf := make([]byte, 64)
+ n, err := f.Read(buf)
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.Equals, io.EOF)
+ c.Check(string(buf[:3]), check.DeepEquals, "foo")
+
+ pos, err := f.Seek(-2, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(1))
+ c.Check(err, check.IsNil)
+
+ // Split a storedExtent in two, and insert a memExtent
+ n, err = f.Write([]byte("*"))
+ c.Check(n, check.Equals, 1)
+ c.Check(err, check.IsNil)
+
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(2))
+ c.Check(err, check.IsNil)
+
+ pos, err = f.Seek(0, io.SeekStart)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+
+ rbuf, err := ioutil.ReadAll(f)
+ c.Check(len(rbuf), check.Equals, 3)
+ c.Check(err, check.IsNil)
+ c.Check(string(rbuf), check.Equals, "f*o")
+
+ // Write multiple blocks in one call
+ f.Seek(1, io.SeekStart)
+ n, err = f.Write([]byte("0123456789abcdefg"))
+ c.Check(n, check.Equals, 17)
+ c.Check(err, check.IsNil)
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(18))
+ pos, err = f.Seek(-18, io.SeekCurrent)
+ c.Check(err, check.IsNil)
+ n, err = io.ReadFull(f, buf)
+ c.Check(n, check.Equals, 18)
+ c.Check(err, check.Equals, io.ErrUnexpectedEOF)
+ c.Check(string(buf[:n]), check.Equals, "f0123456789abcdefg")
+
+ buf2, err := ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+ // truncate to current size
+ err = f.Truncate(18)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcdefg")
+
+ // shrink to zero some data
+ f.Truncate(15)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcd")
+
+ // grow to partial block/extent
+ f.Truncate(20)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "f0123456789abcd\x00\x00\x00\x00\x00")
+
+ f.Truncate(0)
+ f2.Seek(0, io.SeekStart)
+ f2.Write([]byte("12345678abcdefghijkl"))
+
+ // grow to block/extent boundary
+ f.Truncate(64)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf2), check.Equals, 64)
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 8)
+
+ // shrink to block/extent boundary
+ err = f.Truncate(32)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(len(buf2), check.Equals, 32)
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 4)
+
+ // shrink to partial block/extent
+ err = f.Truncate(15)
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "12345678abcdefg")
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 2)
+
+ // Force flush to ensure the block "12345678" gets stored, so
+ // we know what to expect in the final manifest below.
+ _, err = s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+
+ // Truncate to size=3 while f2's ptr is at 15
+ err = f.Truncate(3)
+ c.Check(err, check.IsNil)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "")
+ f2.Seek(0, io.SeekStart)
+ buf2, err = ioutil.ReadAll(f2)
+ c.Check(err, check.IsNil)
+ c.Check(string(buf2), check.Equals, "123")
+ c.Check(len(f.(*filehandle).inode.(*filenode).segments), check.Equals, 1)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+
+ checkSize := func(size int64) {
+ fi, err := f.Stat()
+ c.Check(fi.Size(), check.Equals, size)
+
+ f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ fi, err = f.Stat()
+ c.Check(fi.Size(), check.Equals, size)
+ pos, err := f.Seek(0, io.SeekEnd)
+ c.Check(pos, check.Equals, size)
+ }
+
+ f.Seek(2, io.SeekEnd)
+ checkSize(0)
+ f.Write([]byte{1})
+ checkSize(3)
+
+ f.Seek(2, io.SeekCurrent)
+ checkSize(3)
+ f.Write([]byte{})
+ checkSize(5)
+
+ f.Seek(8, io.SeekStart)
+ checkSize(5)
+ n, err := f.Read(make([]byte, 1))
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ checkSize(5)
+ f.Write([]byte{1, 2, 3})
+ checkSize(11)
+}
+
+func (s *CollectionFSSuite) TestMarshalSmallBlocks(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ for _, name := range []string{"foo", "bar", "baz"} {
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ f.Write([]byte(name))
+ f.Close()
+ }
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, ". c3c23db5285662ef7172373df0003206+6 acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:bar 3:3:baz 6:3:foo\n")
+}
+
+func (s *CollectionFSSuite) TestMkdir(c *check.C) {
+ err := s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ f, err := s.fs.OpenFile("foo/bar", os.O_CREATE, 0)
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ err = s.fs.Mkdir("foo", 0755)
+ c.Check(err, check.IsNil)
+
+ f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_WRONLY, 0)
+ c.Check(err, check.IsNil)
+ if err == nil {
+ defer f.Close()
+ f.Write([]byte("foo"))
+ }
+
+ // mkdir fails if a file already exists with that name
+ err = s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.NotNil)
+
+ err = s.fs.Remove("foo/bar")
+ c.Check(err, check.IsNil)
+
+ // mkdir succeds after the file is deleted
+ err = s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.IsNil)
+
+ // creating a file in a nonexistent subdir should still fail
+ f, err = s.fs.OpenFile("foo/bar/baz/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ f, err = s.fs.OpenFile("foo/bar/foo.txt", os.O_CREATE|os.O_WRONLY, 0)
+ c.Check(err, check.IsNil)
+ if err == nil {
+ defer f.Close()
+ f.Write([]byte("foo"))
+ }
+
+ // creating foo/bar as a regular file should fail
+ f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, 0)
+ c.Check(err, check.NotNil)
+
+ // creating foo/bar as a directory should fail
+ f, err = s.fs.OpenFile("foo/bar", os.O_CREATE|os.O_EXCL, os.ModeDir)
+ c.Check(err, check.NotNil)
+ err = s.fs.Mkdir("foo/bar", 0755)
+ c.Check(err, check.NotNil)
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
+ c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 3:3:bar 0:3:foo\n./foo/bar acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n")
+}
+
+func (s *CollectionFSSuite) TestConcurrentWriters(c *check.C) {
+ maxBlockSize = 8
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var wg sync.WaitGroup
+ for n := 0; n < 128; n++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ for i := 0; i < 6502; i++ {
+ switch rand.Int() & 3 {
+ case 0:
+ f.Truncate(int64(rand.Intn(64)))
+ case 1:
+ f.Seek(int64(rand.Intn(64)), io.SeekStart)
+ case 2:
+ _, err := f.Write([]byte("beep boop"))
+ c.Check(err, check.IsNil)
+ case 3:
+ _, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ }
+ }
+ }()
+ }
+ wg.Wait()
+
+ f, err := s.fs.OpenFile("/dir1/foo", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Logf("after lots of random r/w/seek/trunc, buf is %q", buf)
+}
+
+func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
+ maxBlockSize = 40
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ const nfiles = 256
+ const ngoroutines = 256
+
+ var wg sync.WaitGroup
+ for n := 0; n < nfiles; n++ {
+ wg.Add(1)
+ go func(n int) {
+ defer wg.Done()
+ expect := make([]byte, 0, 64)
+ wbytes := []byte("there's no simple explanation for anything important that any of us do")
+ f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ for i := 0; i < ngoroutines; i++ {
+ trunc := rand.Intn(65)
+ woff := rand.Intn(trunc + 1)
+ wbytes = wbytes[:rand.Intn(64-woff+1)]
+ for buf, i := expect[:cap(expect)], len(expect); i < trunc; i++ {
+ buf[i] = 0
+ }
+ expect = expect[:trunc]
+ if trunc < woff+len(wbytes) {
+ expect = expect[:woff+len(wbytes)]
+ }
+ copy(expect[woff:], wbytes)
+ f.Truncate(int64(trunc))
+ pos, err := f.Seek(int64(woff), io.SeekStart)
+ c.Check(pos, check.Equals, int64(woff))
+ c.Check(err, check.IsNil)
+ n, err := f.Write(wbytes)
+ c.Check(n, check.Equals, len(wbytes))
+ c.Check(err, check.IsNil)
+ pos, err = f.Seek(0, io.SeekStart)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Check(string(buf), check.Equals, string(expect))
+ c.Check(err, check.IsNil)
+ }
+ s.checkMemSize(c, f)
+ }(n)
+ }
+ wg.Wait()
+
+ root, err := s.fs.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err := root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, nfiles)
+
+ _, err = s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ // TODO: check manifest content
+}
+
+func (s *CollectionFSSuite) TestRemove(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir0", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir1", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir1/dir2", 0755)
+ c.Assert(err, check.IsNil)
+ err = fs.Mkdir("dir1/dir3", 0755)
+ c.Assert(err, check.IsNil)
+
+ err = fs.Remove("dir0")
+ c.Check(err, check.IsNil)
+ err = fs.Remove("dir0")
+ c.Check(err, check.Equals, os.ErrNotExist)
+
+ err = fs.Remove("dir1/dir2/.")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ err = fs.Remove("dir1/dir2/..")
+ c.Check(err, check.Equals, ErrInvalidArgument)
+ err = fs.Remove("dir1")
+ c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+ err = fs.Remove("dir1/dir2/../../../dir1")
+ c.Check(err, check.Equals, ErrDirectoryNotEmpty)
+ err = fs.Remove("dir1/dir3/")
+ c.Check(err, check.IsNil)
+ err = fs.RemoveAll("dir1")
+ c.Check(err, check.IsNil)
+ err = fs.RemoveAll("dir1")
+ c.Check(err, check.IsNil)
+}
+
+func (s *CollectionFSSuite) TestRename(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ const (
+ outer = 16
+ inner = 16
+ )
+ for i := 0; i < outer; i++ {
+ err = fs.Mkdir(fmt.Sprintf("dir%d", i), 0755)
+ c.Assert(err, check.IsNil)
+ for j := 0; j < inner; j++ {
+ err = fs.Mkdir(fmt.Sprintf("dir%d/dir%d", i, j), 0755)
+ c.Assert(err, check.IsNil)
+ for _, fnm := range []string{
+ fmt.Sprintf("dir%d/file%d", i, j),
+ fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+ } {
+ f, err := fs.OpenFile(fnm, os.O_CREATE|os.O_WRONLY, 0755)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("beep"))
+ c.Assert(err, check.IsNil)
+ f.Close()
+ }
+ }
+ }
+ var wg sync.WaitGroup
+ for i := 0; i < outer; i++ {
+ for j := 0; j < inner; j++ {
+ wg.Add(1)
+ go func(i, j int) {
+ defer wg.Done()
+ oldname := fmt.Sprintf("dir%d/dir%d/file%d", i, j, j)
+ newname := fmt.Sprintf("dir%d/newfile%d", i, inner-j-1)
+ _, err := fs.Open(newname)
+ c.Check(err, check.Equals, os.ErrNotExist)
+ err = fs.Rename(oldname, newname)
+ c.Check(err, check.IsNil)
+ f, err := fs.Open(newname)
+ c.Check(err, check.IsNil)
+ f.Close()
+ }(i, j)
+
+ wg.Add(1)
+ go func(i, j int) {
+ defer wg.Done()
+ // oldname does not exist
+ err := fs.Rename(
+ fmt.Sprintf("dir%d/dir%d/missing", i, j),
+ fmt.Sprintf("dir%d/dir%d/file%d", outer-i-1, j, j))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+ // newname parent dir does not exist
+ err = fs.Rename(
+ fmt.Sprintf("dir%d/dir%d", i, j),
+ fmt.Sprintf("dir%d/missing/irrelevant", outer-i-1))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+ // oldname parent dir is a file
+ err = fs.Rename(
+ fmt.Sprintf("dir%d/file%d/patherror", i, j),
+ fmt.Sprintf("dir%d/irrelevant", i))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+
+ // newname parent dir is a file
+ err = fs.Rename(
+ fmt.Sprintf("dir%d/dir%d/file%d", i, j, j),
+ fmt.Sprintf("dir%d/file%d/patherror", i, inner-j-1))
+ c.Check(err, check.ErrorMatches, `.*does not exist`)
+ }(i, j)
+ }
+ }
+ wg.Wait()
+
+ f, err := fs.OpenFile("dir1/newfile3", 0, 0)
+ c.Assert(err, check.IsNil)
+ c.Check(f.Size(), check.Equals, int64(4))
+ buf, err := ioutil.ReadAll(f)
+ c.Check(buf, check.DeepEquals, []byte("beep"))
+ c.Check(err, check.IsNil)
+ _, err = fs.Open("dir1/dir1/file1")
+ c.Check(err, check.Equals, os.ErrNotExist)
+}
+
+func (s *CollectionFSSuite) TestPersist(c *check.C) {
+ maxBlockSize = 1024
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ err = s.fs.Mkdir("d:r", 0755)
+ c.Assert(err, check.IsNil)
+
+ expect := map[string][]byte{}
+
+ var wg sync.WaitGroup
+ for _, name := range []string{"random 1", "random:2", "random\\3", "d:r/random4"} {
+ buf := make([]byte, 500)
+ rand.Read(buf)
+ expect[name] = buf
+
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ // Note: we don't close the file until after the test
+ // is done. Writes to unclosed files should persist.
+ defer f.Close()
+
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; i < len(buf); i += 5 {
+ _, err := f.Write(buf[i : i+5])
+ c.Assert(err, check.IsNil)
+ }
+ }()
+ }
+ wg.Wait()
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ c.Logf("%q", m)
+
+ root, err := s.fs.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err := root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, 4)
+
+ persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ root, err = persisted.Open("/")
+ c.Assert(err, check.IsNil)
+ defer root.Close()
+ fi, err = root.Readdir(-1)
+ c.Check(err, check.IsNil)
+ c.Check(len(fi), check.Equals, 4)
+
+ for name, content := range expect {
+ c.Logf("read %q", name)
+ f, err := persisted.Open(name)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, content)
+ }
+}
+
+func (s *CollectionFSSuite) TestPersistEmptyFiles(c *check.C) {
+ var err error
+ s.fs, err = (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ for _, name := range []string{"dir", "dir/zerodir", "zero", "zero/zero"} {
+ err = s.fs.Mkdir(name, 0755)
+ c.Assert(err, check.IsNil)
+ }
+
+ expect := map[string][]byte{
+ "0": nil,
+ "00": []byte{},
+ "one": []byte{1},
+ "dir/0": nil,
+ "dir/two": []byte{1, 2},
+ "dir/zero": nil,
+ "dir/zerodir/zero": nil,
+ "zero/zero/zero": nil,
+ }
+ for name, data := range expect {
+ f, err := s.fs.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ if data != nil {
+ _, err := f.Write(data)
+ c.Assert(err, check.IsNil)
+ }
+ f.Close()
+ }
+
+ m, err := s.fs.MarshalManifest(".")
+ c.Check(err, check.IsNil)
+ c.Logf("%q", m)
+
+ persisted, err := (&Collection{ManifestText: m}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ for name, data := range expect {
+ f, err := persisted.Open("bogus-" + name)
+ c.Check(err, check.NotNil)
+
+ f, err = persisted.Open(name)
+ c.Assert(err, check.IsNil)
+
+ if data == nil {
+ data = []byte{}
+ }
+ buf, err := ioutil.ReadAll(f)
+ c.Check(err, check.IsNil)
+ c.Check(buf, check.DeepEquals, data)
+ }
+}
+
+func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ f, err := fs.OpenFile("missing", os.O_WRONLY, 0)
+ c.Check(f, check.IsNil)
+ c.Check(err, check.ErrorMatches, `file does not exist`)
+
+ f, err = fs.OpenFile("new", os.O_CREATE|os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ n, err := f.Write([]byte{1, 2, 3})
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.ErrorMatches, `read-only file`)
+ n, err = f.Read(make([]byte, 1))
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.Equals, io.EOF)
+ f, err = fs.OpenFile("new", os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ _, err = f.Write([]byte{4, 5, 6})
+ c.Check(err, check.IsNil)
+ fi, err := f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(fi.Size(), check.Equals, int64(3))
+
+ f, err = fs.OpenFile("new", os.O_TRUNC|os.O_RDWR, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+ pos, err := f.Seek(0, io.SeekEnd)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+ fi, err = f.Stat()
+ c.Assert(err, check.IsNil)
+ c.Check(fi.Size(), check.Equals, int64(0))
+ fs.Remove("new")
+
+ buf := make([]byte, 64)
+ f, err = fs.OpenFile("append", os.O_EXCL|os.O_CREATE|os.O_RDWR|os.O_APPEND, 0)
+ c.Assert(err, check.IsNil)
+ f.Write([]byte{1, 2, 3})
+ f.Seek(0, io.SeekStart)
+ n, _ = f.Read(buf[:1])
+ c.Check(n, check.Equals, 1)
+ c.Check(buf[:1], check.DeepEquals, []byte{1})
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(1))
+ f.Write([]byte{4, 5, 6})
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(6))
+ f.Seek(0, io.SeekStart)
+ n, err = f.Read(buf)
+ c.Check(buf[:n], check.DeepEquals, []byte{1, 2, 3, 4, 5, 6})
+ c.Check(err, check.Equals, io.EOF)
+ f.Close()
+
+ f, err = fs.OpenFile("append", os.O_RDWR|os.O_APPEND, 0)
+ c.Assert(err, check.IsNil)
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(0))
+ c.Check(err, check.IsNil)
+ f.Read(buf[:3])
+ pos, _ = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(3))
+ f.Write([]byte{7, 8, 9})
+ pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(9))
+ f.Close()
+
+ f, err = fs.OpenFile("wronly", os.O_CREATE|os.O_WRONLY, 0)
+ c.Assert(err, check.IsNil)
+ n, err = f.Write([]byte{3, 2, 1})
+ c.Check(n, check.Equals, 3)
+ c.Check(err, check.IsNil)
+ pos, _ = f.Seek(0, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(3))
+ pos, _ = f.Seek(0, io.SeekStart)
+ c.Check(pos, check.Equals, int64(0))
+ n, err = f.Read(buf)
+ c.Check(n, check.Equals, 0)
+ c.Check(err, check.ErrorMatches, `.*O_WRONLY.*`)
+ f, err = fs.OpenFile("wronly", os.O_RDONLY, 0)
+ c.Assert(err, check.IsNil)
+ n, _ = f.Read(buf)
+ c.Check(buf[:n], check.DeepEquals, []byte{3, 2, 1})
+
+ f, err = fs.OpenFile("unsupported", os.O_CREATE|os.O_SYNC, 0)
+ c.Check(f, check.IsNil)
+ c.Check(err, check.NotNil)
+
+ f, err = fs.OpenFile("append", os.O_RDWR|os.O_WRONLY, 0)
+ c.Check(f, check.IsNil)
+ c.Check(err, check.ErrorMatches, `invalid flag.*`)
+}
+
+func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+ maxBlockSize = 1024
+ defer func() { maxBlockSize = 2 << 26 }()
+
+ fs, err := (&Collection{}).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+ f, err := fs.OpenFile("50K", os.O_WRONLY|os.O_CREATE, 0)
+ c.Assert(err, check.IsNil)
+ defer f.Close()
+
+ data := make([]byte, 500)
+ rand.Read(data)
+
+ for i := 0; i < 100; i++ {
+ n, err := f.Write(data)
+ c.Assert(n, check.Equals, len(data))
+ c.Assert(err, check.IsNil)
+ }
+
+ currentMemExtents := func() (memExtents []int) {
+ for idx, e := range f.(*filehandle).inode.(*filenode).segments {
+ switch e.(type) {
+ case *memSegment:
+ memExtents = append(memExtents, idx)
+ }
+ }
+ return
+ }
+ c.Check(currentMemExtents(), check.HasLen, 1)
+
+ m, err := fs.MarshalManifest(".")
+ c.Check(m, check.Matches, `[^:]* 0:50000:50K\n`)
+ c.Check(err, check.IsNil)
+ c.Check(currentMemExtents(), check.HasLen, 0)
+}
+
+func (s *CollectionFSSuite) TestBrokenManifests(c *check.C) {
+ for _, txt := range []string{
+ "\n",
+ ".\n",
+ ". \n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 \n",
+ ". 0:0:foo\n",
+ ". 0:0:foo\n",
+ ". 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e 0:0:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 foo:0:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:foo:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:1:foo 1:1:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n",
+ "./foo d41d8cd98f00b204e9800998ecf8427e+1 0:0:bar\n. d41d8cd98f00b204e9800998ecf8427e+1 0:0:foo\n",
+ } {
+ c.Logf("<-%q", txt)
+ fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+ c.Check(fs, check.IsNil)
+ c.Logf("-> %s", err)
+ c.Check(err, check.NotNil)
+ }
+}
+
+func (s *CollectionFSSuite) TestEdgeCaseManifests(c *check.C) {
+ for _, txt := range []string{
+ "",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo 0:0:foo 0:0:bar\n",
+ ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:foo/bar\n./foo d41d8cd98f00b204e9800998ecf8427e+0 0:0:bar\n",
+ } {
+ c.Logf("<-%q", txt)
+ fs, err := (&Collection{ManifestText: txt}).FileSystem(s.client, s.kc)
+ c.Check(err, check.IsNil)
+ c.Check(fs, check.NotNil)
+ }
+}
+
+func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
+ fn := f.(*filehandle).inode.(*filenode)
+ var memsize int64
+ for _, seg := range fn.segments {
+ if e, ok := seg.(*memSegment); ok {
+ memsize += int64(len(e.buf))
+ }
+ }
+ c.Check(fn.memsize, check.Equals, memsize)
+}
+
+type CollectionFSUnitSuite struct{}
+
+var _ = check.Suite(&CollectionFSUnitSuite{})
+
+// expect ~2 seconds to load a manifest with 256K files
+func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
+ const (
+ dirCount = 512
+ fileCount = 512
+ )
+
+ mb := bytes.NewBuffer(make([]byte, 0, 40000000))
+ for i := 0; i < dirCount; i++ {
+ fmt.Fprintf(mb, "./dir%d", i)
+ for j := 0; j <= fileCount; j++ {
+ fmt.Fprintf(mb, " %032x+42+A%040x@%08x", j, j, j)
+ }
+ for j := 0; j < fileCount; j++ {
+ fmt.Fprintf(mb, " %d:%d:dir%d/file%d", j*42+21, 42, j, j)
+ }
+ mb.Write([]byte{'\n'})
+ }
+ coll := Collection{ManifestText: mb.String()}
+ c.Logf("%s built", time.Now())
+
+ var memstats runtime.MemStats
+ runtime.ReadMemStats(&memstats)
+ c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
+
+ f, err := coll.FileSystem(nil, nil)
+ c.Check(err, check.IsNil)
+ c.Logf("%s loaded", time.Now())
+
+ for i := 0; i < dirCount; i++ {
+ for j := 0; j < fileCount; j++ {
+ f.Stat(fmt.Sprintf("./dir%d/dir%d/file%d", i, j, j))
+ }
+ }
+ c.Logf("%s Stat() x %d", time.Now(), dirCount*fileCount)
+
+ runtime.ReadMemStats(&memstats)
+ c.Logf("%s Alloc=%d Sys=%d", time.Now(), memstats.Alloc, memstats.Sys)
}
// Gocheck boilerplate
URL url.URL
StatusCode int
Status string
- errors []string
+ Errors []string
}
func (e TransactionError) Error() (s string) {
if e.Status != "" {
s = s + ": " + e.Status
}
- if len(e.errors) > 0 {
- s = s + ": " + strings.Join(e.errors, "; ")
+ if len(e.Errors) > 0 {
+ s = s + ": " + strings.Join(e.Errors, "; ")
}
return
}
var e TransactionError
if json.Unmarshal(buf, &e) != nil {
// No JSON-formatted error response
- e.errors = nil
+ e.Errors = nil
}
e.Method = req.Method
e.URL = *req.URL
package httpserver
import (
+ "math/rand"
+ "net/http"
"strconv"
"sync"
"time"
// Prefix is prepended to each returned ID.
Prefix string
- lastID int64
- mtx sync.Mutex
+ mtx sync.Mutex
+ src rand.Source
}
// Next returns a new ID string. It is safe to call Next from multiple
// goroutines.
func (g *IDGenerator) Next() string {
- id := time.Now().UnixNano()
g.mtx.Lock()
- if id <= g.lastID {
- id = g.lastID + 1
+ defer g.mtx.Unlock()
+ if g.src == nil {
+ g.src = rand.NewSource(time.Now().UnixNano())
}
- g.lastID = id
- g.mtx.Unlock()
- return g.Prefix + strconv.FormatInt(id, 36)
+ a, b := g.src.Int63(), g.src.Int63()
+ id := strconv.FormatInt(a, 36) + strconv.FormatInt(b, 36)
+ for len(id) > 20 {
+ id = id[:20]
+ }
+ return g.Prefix + id
+}
+
+// AddRequestIDs wraps an http.Handler, adding an X-Request-Id header
+// to each request that doesn't already have one.
+func AddRequestIDs(h http.Handler) http.Handler {
+ gen := &IDGenerator{Prefix: "req-"}
+ return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ if req.Header.Get("X-Request-Id") == "" {
+ req.Header.Set("X-Request-Id", gen.Next())
+ }
+ h.ServeHTTP(w, req)
+ })
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "context"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/stats"
+ log "github.com/Sirupsen/logrus"
+)
+
+type contextKey struct {
+ name string
+}
+
+var requestTimeContextKey = contextKey{"requestTime"}
+
+// LogRequests wraps an http.Handler, logging each request and
+// response via logrus.
+func LogRequests(h http.Handler) http.Handler {
+ return http.HandlerFunc(func(wrapped http.ResponseWriter, req *http.Request) {
+ w := &responseTimer{ResponseWriter: WrapResponseWriter(wrapped)}
+ req = req.WithContext(context.WithValue(req.Context(), &requestTimeContextKey, time.Now()))
+ lgr := log.WithFields(log.Fields{
+ "RequestID": req.Header.Get("X-Request-Id"),
+ "remoteAddr": req.RemoteAddr,
+ "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
+ "reqMethod": req.Method,
+ "reqPath": req.URL.Path[1:],
+ "reqBytes": req.ContentLength,
+ })
+ logRequest(w, req, lgr)
+ defer logResponse(w, req, lgr)
+ h.ServeHTTP(w, req)
+ })
+}
+
+func logRequest(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ lgr.Info("request")
+}
+
+func logResponse(w *responseTimer, req *http.Request, lgr *log.Entry) {
+ if tStart, ok := req.Context().Value(&requestTimeContextKey).(time.Time); ok {
+ tDone := time.Now()
+ lgr = lgr.WithFields(log.Fields{
+ "timeTotal": stats.Duration(tDone.Sub(tStart)),
+ "timeToStatus": stats.Duration(w.writeTime.Sub(tStart)),
+ "timeWriteBody": stats.Duration(tDone.Sub(w.writeTime)),
+ })
+ }
+ lgr.WithFields(log.Fields{
+ "respStatusCode": w.WroteStatus(),
+ "respStatus": http.StatusText(w.WroteStatus()),
+ "respBytes": w.WroteBodyBytes(),
+ }).Info("response")
+}
+
+type responseTimer struct {
+ ResponseWriter
+ wrote bool
+ writeTime time.Time
+}
+
+func (rt *responseTimer) WriteHeader(code int) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ rt.ResponseWriter.WriteHeader(code)
+}
+
+func (rt *responseTimer) Write(p []byte) (int, error) {
+ if !rt.wrote {
+ rt.wrote = true
+ rt.writeTime = time.Now()
+ }
+ return rt.ResponseWriter.Write(p)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package httpserver
+
+import (
+ "bytes"
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+ "time"
+
+ log "github.com/Sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&Suite{})
+
+type Suite struct{}
+
+func (s *Suite) TestLogRequests(c *check.C) {
+ defer log.SetOutput(os.Stdout)
+ captured := &bytes.Buffer{}
+ log.SetOutput(captured)
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: time.RFC3339Nano,
+ })
+ h := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
+ w.Write([]byte("hello world"))
+ })
+ req, err := http.NewRequest("GET", "https://foo.example/bar", nil)
+ req.Header.Set("X-Forwarded-For", "1.2.3.4:12345")
+ c.Assert(err, check.IsNil)
+ resp := httptest.NewRecorder()
+ AddRequestIDs(LogRequests(h)).ServeHTTP(resp, req)
+
+ dec := json.NewDecoder(captured)
+
+ gotReq := make(map[string]interface{})
+ err = dec.Decode(&gotReq)
+ c.Logf("%#v", gotReq)
+ c.Check(gotReq["RequestID"], check.Matches, "req-[a-z0-9]{20}")
+ c.Check(gotReq["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotReq["msg"], check.Equals, "request")
+
+ gotResp := make(map[string]interface{})
+ err = dec.Decode(&gotResp)
+ c.Logf("%#v", gotResp)
+ c.Check(gotResp["RequestID"], check.Equals, gotReq["RequestID"])
+ c.Check(gotResp["reqForwardedFor"], check.Equals, "1.2.3.4:12345")
+ c.Check(gotResp["msg"], check.Equals, "response")
+
+ c.Assert(gotResp["time"], check.FitsTypeOf, "")
+ _, err = time.Parse(time.RFC3339Nano, gotResp["time"].(string))
+ c.Check(err, check.IsNil)
+
+ for _, key := range []string{"timeToStatus", "timeWriteBody", "timeTotal"} {
+ c.Assert(gotResp[key], check.FitsTypeOf, float64(0))
+ c.Check(gotResp[key].(float64), check.Not(check.Equals), float64(0))
+ }
+}
"net/http"
)
-// ResponseWriter wraps http.ResponseWriter and exposes the status
+type ResponseWriter interface {
+ http.ResponseWriter
+ WroteStatus() int
+ WroteBodyBytes() int
+}
+
+// responseWriter wraps http.ResponseWriter and exposes the status
// sent, the number of bytes sent to the client, and the last write
// error.
-type ResponseWriter struct {
+type responseWriter struct {
http.ResponseWriter
- wroteStatus *int // Last status given to WriteHeader()
- wroteBodyBytes *int // Bytes successfully written
- err *error // Last error returned from Write()
+ wroteStatus int // Last status given to WriteHeader()
+ wroteBodyBytes int // Bytes successfully written
+ err error // Last error returned from Write()
}
func WrapResponseWriter(orig http.ResponseWriter) ResponseWriter {
- return ResponseWriter{orig, new(int), new(int), new(error)}
+ return &responseWriter{ResponseWriter: orig}
}
-func (w ResponseWriter) WriteHeader(s int) {
- *w.wroteStatus = s
+func (w *responseWriter) WriteHeader(s int) {
+ w.wroteStatus = s
w.ResponseWriter.WriteHeader(s)
}
-func (w ResponseWriter) Write(data []byte) (n int, err error) {
+func (w *responseWriter) Write(data []byte) (n int, err error) {
n, err = w.ResponseWriter.Write(data)
- *w.wroteBodyBytes += n
- *w.err = err
+ w.wroteBodyBytes += n
+ w.err = err
return
}
-func (w ResponseWriter) WroteStatus() int {
- return *w.wroteStatus
+func (w *responseWriter) WroteStatus() int {
+ return w.wroteStatus
}
-func (w ResponseWriter) WroteBodyBytes() int {
- return *w.wroteBodyBytes
+func (w *responseWriter) WroteBodyBytes() int {
+ return w.wroteBodyBytes
}
-func (w ResponseWriter) Err() error {
- return *w.err
+func (w *responseWriter) Err() error {
+ return w.err
}
import (
"io"
"sort"
+ "strconv"
+ "strings"
"sync"
"time"
)
}
}
+// ReadAt returns data from the cache, first retrieving it from Keep if
+// necessary.
+func (c *BlockCache) ReadAt(kc *KeepClient, locator string, p []byte, off int) (int, error) {
+ buf, err := c.Get(kc, locator)
+ if err != nil {
+ return 0, err
+ }
+ if off > len(buf) {
+ return 0, io.ErrUnexpectedEOF
+ }
+ return copy(p, buf[off:]), nil
+}
+
// Get returns data from the cache, first retrieving it from Keep if
// necessary.
func (c *BlockCache) Get(kc *KeepClient, locator string) ([]byte, error) {
cacheKey := locator[:32]
+ bufsize := BLOCKSIZE
+ if parts := strings.SplitN(locator, "+", 3); len(parts) >= 2 {
+ datasize, err := strconv.ParseInt(parts[1], 10, 32)
+ if err == nil && datasize >= 0 {
+ bufsize = int(datasize)
+ }
+ }
c.mtx.Lock()
if c.cache == nil {
c.cache = make(map[string]*cacheBlock)
rdr, size, _, err := kc.Get(locator)
var data []byte
if err == nil {
- data = make([]byte, size, BLOCKSIZE)
+ data = make([]byte, size, bufsize)
_, err = io.ReadFull(rdr, data)
err2 := rdr.Close()
if err == nil {
import (
"errors"
- "fmt"
- "io"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/manifest"
)
-const (
- // After reading a data block from Keep, cfReader slices it up
- // and sends the slices to a buffered channel to be consumed
- // by the caller via Read().
- //
- // dataSliceSize is the maximum size of the slices, and
- // therefore the maximum number of bytes that will be returned
- // by a single call to Read().
- dataSliceSize = 1 << 20
-)
-
// ErrNoManifest indicates the given collection has no manifest
// information (e.g., manifest_text was excluded by a "select"
// parameter when retrieving the collection record).
if !ok {
return nil, ErrNoManifest
}
- m := manifest.Manifest{Text: mText}
- return kc.ManifestFileReader(m, filename)
-}
-
-func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
- f := &file{
- kc: kc,
- }
- err := f.load(m, filename)
+ fs, err := (&arvados.Collection{ManifestText: mText}).FileSystem(nil, kc)
if err != nil {
return nil, err
}
- return f, nil
-}
-
-type file struct {
- kc *KeepClient
- segments []*manifest.FileSegment
- size int64 // total file size
- offset int64 // current read offset
-
- // current/latest segment accessed -- might or might not match pos
- seg *manifest.FileSegment
- segStart int64 // position of segment relative to file
- segData []byte
- segNext []*manifest.FileSegment
- readaheadDone bool
-}
-
-// Close implements io.Closer.
-func (f *file) Close() error {
- f.kc = nil
- f.segments = nil
- f.segData = nil
- return nil
+ return fs.OpenFile(filename, os.O_RDONLY, 0)
}
-// Read implements io.Reader.
-func (f *file) Read(buf []byte) (int, error) {
- if f.seg == nil || f.offset < f.segStart || f.offset >= f.segStart+int64(f.seg.Len) {
- // f.seg does not cover the current read offset
- // (f.pos). Iterate over f.segments to find the one
- // that does.
- f.seg = nil
- f.segStart = 0
- f.segData = nil
- f.segNext = f.segments
- for len(f.segNext) > 0 {
- seg := f.segNext[0]
- f.segNext = f.segNext[1:]
- segEnd := f.segStart + int64(seg.Len)
- if segEnd > f.offset {
- f.seg = seg
- break
- }
- f.segStart = segEnd
- }
- f.readaheadDone = false
- }
- if f.seg == nil {
- return 0, io.EOF
- }
- if f.segData == nil {
- data, err := f.kc.cache().Get(f.kc, f.seg.Locator)
- if err != nil {
- return 0, err
- }
- if len(data) < f.seg.Offset+f.seg.Len {
- return 0, fmt.Errorf("invalid segment (offset %d len %d) in %d-byte block %s", f.seg.Offset, f.seg.Len, len(data), f.seg.Locator)
- }
- f.segData = data[f.seg.Offset : f.seg.Offset+f.seg.Len]
- }
- // dataOff and dataLen denote a portion of f.segData
- // corresponding to a portion of the file at f.offset.
- dataOff := int(f.offset - f.segStart)
- dataLen := f.seg.Len - dataOff
-
- if !f.readaheadDone && len(f.segNext) > 0 && f.offset >= 1048576 && dataOff+dataLen > len(f.segData)/16 {
- // If we have already read more than just the first
- // few bytes of this file, and we have already
- // consumed a noticeable portion of this segment, and
- // there's more data for this file in the next segment
- // ... then there's a good chance we are going to need
- // the data for that next segment soon. Start getting
- // it into the cache now.
- go f.kc.cache().Get(f.kc, f.segNext[0].Locator)
- f.readaheadDone = true
- }
-
- n := len(buf)
- if n > dataLen {
- n = dataLen
- }
- copy(buf[:n], f.segData[dataOff:dataOff+n])
- f.offset += int64(n)
- return n, nil
-}
-
-// Seek implements io.Seeker.
-func (f *file) Seek(offset int64, whence int) (int64, error) {
- var want int64
- switch whence {
- case io.SeekStart:
- want = offset
- case io.SeekCurrent:
- want = f.offset + offset
- case io.SeekEnd:
- want = f.size + offset
- default:
- return f.offset, fmt.Errorf("invalid whence %d", whence)
- }
- if want < 0 {
- return f.offset, fmt.Errorf("attempted seek to %d", want)
- }
- if want > f.size {
- want = f.size
- }
- f.offset = want
- return f.offset, nil
-}
-
-// Size returns the file size in bytes.
-func (f *file) Size() int64 {
- return f.size
-}
-
-func (f *file) load(m manifest.Manifest, path string) error {
- f.segments = nil
- f.size = 0
- for seg := range m.FileSegmentIterByName(path) {
- f.segments = append(f.segments, seg)
- f.size += int64(seg.Len)
- }
- if f.segments == nil {
- return os.ErrNotExist
+func (kc *KeepClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+ fs, err := (&arvados.Collection{ManifestText: m.Text}).FileSystem(nil, kc)
+ if err != nil {
+ return nil, err
}
- return nil
+ return fs.OpenFile(filename, os.O_RDONLY, 0)
}
func (s *CollectionReaderUnit) TestCollectionReaderManyBlocks(c *check.C) {
h := md5.New()
- var testdata []byte
buf := make([]byte, 4096)
locs := make([]string, len(buf))
+ testdata := make([]byte, 0, len(buf)*len(buf))
filesize := 0
- for i := 0; i < len(locs); i++ {
+ for i := range locs {
_, err := rand.Read(buf[:i])
h.Write(buf[:i])
locs[i], _, err = s.kc.PutB(buf[:i])
c.Check(md5.Sum(buf), check.DeepEquals, md5.Sum(testdata))
c.Check(buf[:1000], check.DeepEquals, testdata[:1000])
+ expectPos := curPos + size + 12345
curPos, err = rdr.Seek(size+12345, io.SeekCurrent)
c.Check(err, check.IsNil)
- c.Check(curPos, check.Equals, size)
+ c.Check(curPos, check.Equals, expectPos)
- curPos, err = rdr.Seek(8-size, io.SeekCurrent)
+ curPos, err = rdr.Seek(8-curPos, io.SeekCurrent)
c.Check(err, check.IsNil)
c.Check(curPos, check.Equals, int64(8))
return kc.getOrHead("GET", locator)
}
+// ReadAt() retrieves a portion of block from the cache if it's
+// present, otherwise from the network.
+func (kc *KeepClient) ReadAt(locator string, p []byte, off int) (int, error) {
+ return kc.cache().ReadAt(kc, locator, p, off)
+}
+
// Ask() verifies that a block with the given hash is available and
// readable, according to at least one Keep service. Unlike Get, it
// does not retrieve the data or verify that the data content matches
after_validation :assign_auth
before_save :sort_serialized_attrs
after_save :handle_completed
+ after_save :propagate_priority
has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
end
end
+ def propagate_priority
+ if self.priority_changed?
+ act_as_system_user do
+ # Update the priority of child container requests to match new priority
+ # of the parent container.
+ ContainerRequest.where(requesting_container_uuid: self.uuid,
+ state: ContainerRequest::Committed).each do |cr|
+ cr.priority = self.priority
+ cr.save
+ end
+ end
+ end
+ end
+
# Create a new container (or find an existing one) to satisfy the
# given container request.
def self.resolve(req)
assert_equal 0, c2.priority
end
+
+ test "Container makes container request, then changes priority" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 5, state: "Committed", container_count_max: 1)
+
+ c = Container.find_by_uuid cr.container_uuid
+ assert_equal 5, c.priority
+
+ cr2 = create_minimal_req!
+ cr2.update_attributes!(priority: 5, state: "Committed", requesting_container_uuid: c.uuid, command: ["echo", "foo2"], container_count_max: 1)
+ cr2.reload
+
+ c2 = Container.find_by_uuid cr2.container_uuid
+ assert_equal 5, c2.priority
+
+ act_as_system_user do
+ c.priority = 10
+ c.save!
+ end
+
+ cr.reload
+
+ cr2.reload
+ assert_equal 10, cr2.priority
+
+ c2.reload
+ assert_equal 10, c2.priority
+ end
+
[
['running_container_auth', 'zzzzz-dz642-runningcontainr'],
['active_no_prefs', nil],
}
r.URL.Path = rewrittenPath
- h.handler.ServeHTTP(&w, r)
+ h.handler.ServeHTTP(w, r)
}
import (
"encoding/json"
"flag"
+ "fmt"
"log"
"os"
"regexp"
"github.com/coreos/go-systemd/daemon"
)
+var version = "dev"
+
// Server configuration
type Config struct {
Client arvados.Client
cfgPath := flag.String("config", defaultCfgPath, "Configuration file `path`.")
dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+ getVersion := flag.Bool("version", false, "print version information and exit.")
flag.StringVar(&theConfig.ManagementToken, "management-token", theConfig.ManagementToken,
"Authorization token to be included in all health check requests.")
flag.Usage = usage
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("arv-git-httpd %s\n", version)
+ return
+ }
+
err := config.LoadFile(theConfig, *cfgPath)
if err != nil {
h := os.Getenv("ARVADOS_API_HOST")
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
+ log.Printf("arv-git-httpd %s started", version)
log.Println("Listening at", srv.Addr)
log.Println("Repository root", theConfig.RepoRoot)
if err := srv.Wait(); err != nil {
import (
"context"
"flag"
+ "fmt"
"log"
"os"
"os/exec"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
)
+var version = "dev"
+
func main() {
err := doMain()
if err != nil {
"/usr/bin/crunch-run",
"Crunch command to run container")
+ getVersion := flags.Bool(
+ "version",
+ false,
+ "Print version information and exit.")
+
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("crunch-dispatch-local %s\n", version)
+ return nil
+ }
+
+ log.Printf("crunch-dispatch-local %s started", version)
+
runningCmds = make(map[string]*exec.Cmd)
arv, err := arvadosclient.MakeArvadosClient()
"github.com/coreos/go-systemd/daemon"
)
+var version = "dev"
+
// Config used by crunch-dispatch-slurm
type Config struct {
Client arvados.Client
"dump-config",
false,
"write current configuration to stdout and exit")
-
+ getVersion := flags.Bool(
+ "version",
+ false,
+ "Print version information and exit.")
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("crunch-dispatch-slurm %s\n", version)
+ return nil
+ }
+
+ log.Printf("crunch-dispatch-slurm %s started", version)
+
err := readConfig(&theConfig, *configPath)
if err != nil {
return err
"os/signal"
"path"
"path/filepath"
+ "regexp"
"runtime"
"runtime/pprof"
"sort"
dockerclient "github.com/docker/docker/client"
)
+var version = "dev"
+
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
}
}
+var errorBlacklist = []string{
+ "(?ms).*[Cc]annot connect to the Docker daemon.*",
+ "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
+}
+var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+
+func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+ for _, d := range errorBlacklist {
+ if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
+ runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+ if *brokenNodeHook == "" {
+ runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ } else {
+ runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ // run killme script
+ c := exec.Command(*brokenNodeHook)
+ c.Stdout = runner.CrunchLog
+ c.Stderr = runner.CrunchLog
+ err := c.Run()
+ if err != nil {
+ runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+ }
+ }
+ return true
+ }
+ }
+ return false
+}
+
// LoadImage determines the docker image id from the container record and
// checks if it is available in the local Docker image store. If not, it loads
// the image from Keep.
cmd []string
}
-// Gather node information and store it on the log for debugging
+// LogNodeInfo gathers node information and store it on the log for debugging
// purposes.
func (runner *ContainerRunner) LogNodeInfo() (err error) {
w := runner.NewLogWriter("node-info")
return nil
}
-// Get and save the raw JSON container record from the API server
+// LogContainerRecord gets and saves the raw JSON container record from the API server
func (runner *ContainerRunner) LogContainerRecord() (err error) {
w := &ArvLogWriter{
ArvClient: runner.ArvClient,
dockertypes.ContainerStartOptions{})
if err != nil {
var advice string
- if strings.Contains(err.Error(), "no such file or directory") {
+ if m, e := regexp.MatchString("(?ms).*(exec|System error).*(no such file or directory|file not found).*", err.Error()); m && e == nil {
advice = fmt.Sprintf("\nPossible causes: command %q is missing, the interpreter given in #! is missing, or script has Windows line endings.", runner.Container.Command[0])
}
return fmt.Errorf("could not start container: %v%s", err, advice)
// Run the full container lifecycle.
func (runner *ContainerRunner) Run() (err error) {
+ runner.CrunchLog.Printf("crunch-run %s started", version)
runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
hostname, hosterr := os.Hostname()
// check for and/or load image
err = runner.LoadImage()
if err != nil {
- runner.finalState = "Cancelled"
+ if !runner.checkBrokenNode(err) {
+ // Failed to load image but not due to a "broken node"
+ // condition, probably user error.
+ runner.finalState = "Cancelled"
+ }
err = fmt.Errorf("While loading container image: %v", err)
return
}
return
}
- runner.StartCrunchstat()
-
if runner.IsCancelled() {
return
}
}
runner.finalState = "Cancelled"
+ runner.StartCrunchstat()
+
err = runner.StartContainer()
if err != nil {
+ runner.checkBrokenNode(err)
return
}
`Set networking mode for container. Corresponds to Docker network mode (--net).
`)
memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("crunch-run %s\n", version)
+ return
+ }
+
+ log.Printf("crunch-run %s started", version)
+
containerId := flag.Arg(0)
if *caCertsPath != "" {
}
api.Retries = 8
- var kc *keepclient.KeepClient
- kc, err = keepclient.MakeKeepClient(api)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
+ kc, kcerr := keepclient.MakeKeepClient(api)
+ if kcerr != nil {
+ log.Fatalf("%s: %v", containerId, kcerr)
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
kc.Retries = 4
- var docker *dockerclient.Client
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
- docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- if err != nil {
- log.Fatalf("%s: %v", containerId, err)
- }
-
+ docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
dockerClientProxy := ThinDockerClientProxy{Docker: docker}
cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+
+ if dockererr != nil {
+ cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+ cr.checkBrokenNode(dockererr)
+ cr.CrunchLog.Close()
+ os.Exit(1)
+ }
+
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
}
func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
+ if t.finish == 3 {
+ return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
+ }
+ if t.finish == 4 {
+ return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
+ }
+ if t.finish == 5 {
+ return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
+ }
+ if t.finish == 6 {
+ return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
+ }
+
if container == "abcde" {
// t.fn gets executed in ContainerWait
return nil
}
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
+
if t.imageLoaded == image {
return dockertypes.ImageInspect{}, nil, nil
} else {
}
func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
+ if t.finish == 2 {
+ return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
+ }
_, err := io.Copy(ioutil.Discard, input)
if err != nil {
return dockertypes.ImageLoadResponse{}, err
len int64
}
+func (fw FileWrapper) Readdir(n int) ([]os.FileInfo, error) {
+ return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Seek(int64, int) (int64, error) {
+ return 0, errors.New("not implemented")
+}
+
func (fw FileWrapper) Size() int64 {
return fw.len
}
-func (fw FileWrapper) Seek(int64, int) (int64, error) {
+func (fw FileWrapper) Stat() (os.FileInfo, error) {
+ return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Truncate(int64) error {
+ return errors.New("not implemented")
+}
+
+func (fw FileWrapper) Write([]byte) (int, error) {
return 0, errors.New("not implemented")
}
func (KeepReadErrorTestClient) ClearBlockCache() {
}
-type ErrorReader struct{}
+type ErrorReader struct {
+ FileWrapper
+}
func (ErrorReader) Read(p []byte) (n int, err error) {
return 0, errors.New("ErrorReader")
}
-func (ErrorReader) Close() error {
- return nil
-}
-
-func (ErrorReader) Size() int64 {
- return 0
-}
-
func (ErrorReader) Seek(int64, int) (int64, error) {
return 0, errors.New("ErrorReader")
}
if api.CalledWith("container.state", "Complete") != nil {
c.Check(err, IsNil)
}
- c.Check(api.WasSetRunning, Equals, true)
-
- c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ if exitCode != 2 {
+ c.Check(api.WasSetRunning, Equals, true)
+ c.Check(api.Content[api.Calls-2]["container"].(arvadosclient.Dict)["log"], NotNil)
+ }
if err != nil {
for k, v := range api.Logs {
_, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
c.Assert(err, NotNil)
}
+
+func (s *TestSuite) TestFullBrokenDocker1(c *C) {
+ tf, err := ioutil.TempFile("", "brokenNodeHook-")
+ c.Assert(err, IsNil)
+ defer os.Remove(tf.Name())
+
+ tf.Write([]byte(`#!/bin/sh
+exec echo killme
+`))
+ tf.Close()
+ os.Chmod(tf.Name(), 0700)
+
+ ech := tf.Name()
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
+
+}
+
+func (s *TestSuite) TestFullBrokenDocker2(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 2, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Queued"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*No broken node hook.*")
+}
+
+func (s *TestSuite) TestFullBrokenDocker3(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 3, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
+}
+
+func (s *TestSuite) TestBadCommand1(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 4, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}
+
+func (s *TestSuite) TestBadCommand2(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 5, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}
+
+func (s *TestSuite) TestBadCommand3(c *C) {
+ ech := ""
+ brokenNodeHook = &ech
+
+ api, _, _ := FullRunHelper(c, `{
+ "command": ["echo", "hello world"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+}`, nil, 6, func(t *TestDockerClient) {
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
+ })
+
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+ c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
+}
import (
"bufio"
"flag"
+ "fmt"
"io"
"log"
"os"
var (
signalOnDeadPPID int = 15
ppidCheckInterval = time.Second
+ version = "dev"
)
func main() {
flag.IntVar(&signalOnDeadPPID, "signal-on-dead-ppid", signalOnDeadPPID, "Signal to send child if crunchstat's parent process disappears (0 to disable)")
flag.DurationVar(&ppidCheckInterval, "ppid-check-interval", ppidCheckInterval, "Time between checks for parent process disappearance")
pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("crunchstat %s\n", version)
+ return
+ }
+
+ reporter.Logger.Printf("crunchstat %s started", version)
+
if reporter.CgroupRoot == "" {
reporter.Logger.Fatal("error: must provide -cgroup-root")
} else if signalOnDeadPPID < 0 {
logging.getLogger('arvados.collection').setLevel(logging.DEBUG)
self.logger.debug("arv-mount debugging enabled")
+ self.logger.info("%s %s started", sys.argv[0], __version__)
self.logger.info("enable write is %s", self.args.enable_write)
def _setup_api(self):
import (
"flag"
+ "fmt"
"net/http"
"git.curoverse.com/arvados.git/sdk/go/arvados"
log "github.com/Sirupsen/logrus"
)
+var version = "dev"
+
func main() {
configFile := flag.String("config", arvados.DefaultConfigFile, "`path` to arvados configuration file")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("arvados-health %s\n", version)
+ return
+ }
+
log.SetFormatter(&log.JSONFormatter{
TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
})
+ log.Printf("arvados-health %s started", version)
+
cfg, err := arvados.GetConfig(*configFile)
if err != nil {
log.Fatal(err)
import (
"encoding/json"
"flag"
+ "fmt"
"log"
"os"
"os/signal"
"git.curoverse.com/arvados.git/sdk/go/config"
)
+var version = "dev"
+
const defaultConfigPath = "/etc/arvados/keep-balance/keep-balance.yml"
// Config specifies site configuration, like API credentials and the
dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit")
dumpFlag := flag.Bool("dump", false, "dump details for each block to stdout")
debugFlag := flag.Bool("debug", false, "enable debug messages")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
flag.Usage = usage
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keep-balance %s\n", version)
+ return
+ }
+
mustReadConfig(&cfg, *configPath)
if *serviceListPath != "" {
mustReadConfig(&cfg.KeepServiceList, *serviceListPath)
log.Fatal(config.DumpAndExit(cfg))
}
+ log.Printf("keep-balance %s started", version)
+
if *debugFlag {
debugf = log.Printf
if j, err := json.Marshal(cfg); err != nil {
}
}
+// Update saves a modified version (fs) to an existing collection
+// (coll) and, if successful, updates the relevant cache entries so
+// subsequent calls to Get() reflect the modifications.
+func (c *cache) Update(client *arvados.Client, coll arvados.Collection, fs arvados.CollectionFileSystem) error {
+ c.setupOnce.Do(c.setup)
+
+ if m, err := fs.MarshalManifest("."); err != nil || m == coll.ManifestText {
+ return err
+ } else {
+ coll.ManifestText = m
+ }
+ var updated arvados.Collection
+ defer c.pdhs.Remove(coll.UUID)
+ err := client.RequestAndDecode(&updated, "PATCH", "/arvados/v1/collections/"+coll.UUID, client.UpdateBody(coll), nil)
+ if err == nil {
+ c.collections.Add(client.AuthToken+"\000"+coll.PortableDataHash, &cachedCollection{
+ expire: time.Now().Add(time.Duration(c.TTL)),
+ collection: &updated,
+ })
+ }
+ return err
+}
+
func (c *cache) Get(arv *arvadosclient.ArvadosClient, targetID string, forceReload bool) (*arvados.Collection, error) {
c.setupOnce.Do(c.setup)
import (
"bytes"
"io"
+ "io/ioutil"
+ "net/url"
+ "os"
"os/exec"
+ "strings"
+ "time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
func (s *IntegrationSuite) TestWebdavWithCadaver(c *check.C) {
- basePath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
+ testdata := []byte("the human tragedy consists in the necessity of living with the consequences of actions performed under the pressure of compulsions we do not understand")
+
+ localfile, err := ioutil.TempFile("", "localfile")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(localfile.Name())
+ localfile.Write(testdata)
+
+ emptyfile, err := ioutil.TempFile("", "emptyfile")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(emptyfile.Name())
+
+ checkfile, err := ioutil.TempFile("", "checkfile")
+ c.Assert(err, check.IsNil)
+ defer os.Remove(checkfile.Name())
+
+ var newCollection arvados.Collection
+ arv := arvados.NewClientFromEnv()
+ arv.AuthToken = arvadostest.ActiveToken
+ err = arv.RequestAndDecode(&newCollection, "POST", "/arvados/v1/collections", bytes.NewBufferString(url.Values{"collection": {"{}"}}.Encode()), nil)
+ c.Assert(err, check.IsNil)
+ writePath := "/c=" + newCollection.UUID + "/t=" + arv.AuthToken + "/"
+
+ pdhPath := "/c=" + strings.Replace(arvadostest.FooAndBarFilesInDirPDH, "+", "-", -1) + "/t=" + arv.AuthToken + "/"
+
+ matchToday := time.Now().Format("Jan +2")
+
+ readPath := "/c=" + arvadostest.FooAndBarFilesInDirUUID + "/t=" + arvadostest.ActiveToken + "/"
type testcase struct {
path string
cmd string
match string
+ data []byte
}
for _, trial := range []testcase{
{
- path: basePath,
+ path: readPath,
cmd: "ls\n",
match: `(?ms).*dir1 *0 .*`,
},
{
- path: basePath,
+ path: readPath,
cmd: "ls dir1\n",
match: `(?ms).*bar *3.*foo *3 .*`,
},
{
- path: basePath + "_/dir1",
+ path: readPath + "_/dir1",
cmd: "ls\n",
match: `(?ms).*bar *3.*foo *3 .*`,
},
{
- path: basePath + "dir1/",
+ path: readPath + "dir1/",
cmd: "ls\n",
- match: `(?ms).*bar *3.*foo *3 .*`,
+ match: `(?ms).*bar *3.*foo +3 +Feb +\d+ +2014.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get emptyfile '" + checkfile.Name() + "'\n",
+ match: `(?ms).*Not Found.*`,
+ },
+ {
+ path: writePath,
+ cmd: "put '" + emptyfile.Name() + "' emptyfile\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get emptyfile '" + checkfile.Name() + "'\n",
+ match: `(?ms).*Downloading .* succeeded.*`,
+ data: []byte{},
+ },
+ {
+ path: writePath,
+ cmd: "put '" + localfile.Name() + "' testfile\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get testfile '" + checkfile.Name() + "'\n",
+ match: `(?ms).*succeeded.*`,
+ data: testdata,
+ },
+ {
+ path: writePath,
+ cmd: "move testfile newdir0/\n",
+ match: `(?ms).*Moving .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "move testfile newdir0/\n",
+ match: `(?ms).*Moving .* failed.*`,
+ },
+ {
+ path: writePath,
+ cmd: "ls\n",
+ match: `(?ms).*newdir0.* 0 +` + matchToday + ` \d+:\d+\n.*`,
+ },
+ {
+ path: writePath,
+ cmd: "move newdir0/testfile emptyfile/bogus/\n",
+ match: `(?ms).*Moving .* failed.*`,
+ },
+ {
+ path: writePath,
+ cmd: "mkcol newdir1\n",
+ match: `(?ms).*Creating .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "move newdir0/testfile newdir1/\n",
+ match: `(?ms).*Moving .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "put '" + localfile.Name() + "' newdir1/testfile1\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "mkcol newdir2\n",
+ match: `(?ms).*Creating .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "put '" + localfile.Name() + "' newdir2/testfile2\n",
+ match: `(?ms).*Uploading .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "copy newdir2/testfile2 testfile3\n",
+ match: `(?ms).*succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get testfile3 '" + checkfile.Name() + "'\n",
+ match: `(?ms).*succeeded.*`,
+ data: testdata,
+ },
+ {
+ path: writePath,
+ cmd: "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+ match: `(?ms).*succeeded.*`,
+ data: testdata,
+ },
+ {
+ path: writePath,
+ cmd: "rmcol newdir2\n",
+ match: `(?ms).*Deleting collection .* succeeded.*`,
+ },
+ {
+ path: writePath,
+ cmd: "get newdir2/testfile2 '" + checkfile.Name() + "'\n",
+ match: `(?ms).*Downloading .* failed.*`,
+ },
+ {
+ path: "/c=" + arvadostest.UserAgreementCollection + "/t=" + arv.AuthToken + "/",
+ cmd: "put '" + localfile.Name() + "' foo\n",
+ match: `(?ms).*Uploading .* failed:.*403 Forbidden.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "put '" + localfile.Name() + "' foo\n",
+ match: `(?ms).*Uploading .* failed:.*405 Method Not Allowed.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "move foo bar\n",
+ match: `(?ms).*Moving .* failed:.*405 Method Not Allowed.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "copy foo bar\n",
+ match: `(?ms).*Copying .* failed:.*405 Method Not Allowed.*`,
+ },
+ {
+ path: pdhPath,
+ cmd: "delete foo\n",
+ match: `(?ms).*Deleting .* failed:.*405 Method Not Allowed.*`,
},
} {
- c.Logf("%s %#v", "http://"+s.testServer.Addr, trial)
+ c.Logf("%s %+v", "http://"+s.testServer.Addr, trial)
+
+ os.Remove(checkfile.Name())
+
cmd := exec.Command("cadaver", "http://"+s.testServer.Addr+trial.path)
cmd.Stdin = bytes.NewBufferString(trial.cmd)
stdout, err := cmd.StdoutPipe()
err = cmd.Wait()
c.Check(err, check.Equals, nil)
c.Check(buf.String(), check.Matches, trial.match)
+
+ if trial.data == nil {
+ continue
+ }
+ checkfile, err = os.Open(checkfile.Name())
+ c.Assert(err, check.IsNil)
+ checkfile.Seek(0, os.SEEK_SET)
+ got, err := ioutil.ReadAll(checkfile)
+ c.Check(got, check.DeepEquals, trial.data)
+ c.Check(err, check.IsNil)
}
}
//
// SPDX-License-Identifier: AGPL-3.0
-// Keep-web provides read-only HTTP access to files stored in Keep. It
-// serves public data to anonymous and unauthenticated clients, and
-// serves private data to clients that supply Arvados API tokens. It
-// can be installed anywhere with access to Keep services, typically
-// behind a web proxy that supports TLS.
+// Keep-web provides read/write HTTP (WebDAV) access to files stored
+// in Keep. It serves public data to anonymous and unauthenticated
+// clients, and serves private data to clients that supply Arvados API
+// tokens. It can be installed anywhere with access to Keep services,
+// typically behind a web proxy that supports TLS.
//
// See http://doc.arvados.org/install/install-keep-web.html.
//
//
// Proxy configuration
//
-// Keep-web does not support SSL natively. Typically, it is installed
+// Keep-web does not support TLS natively. Typically, it is installed
// behind a proxy like nginx.
//
// Here is an example nginx configuration.
"html"
"html/template"
"io"
+ "log"
"net/http"
"net/url"
"os"
func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
status := struct {
cacheStats
+ Version string
}{
cacheStats: h.Config.Cache.Stats(),
+ Version: version,
}
json.NewEncoder(w).Encode(status)
}
+// updateOnSuccess wraps httpserver.ResponseWriter. If the handler
+// sends an HTTP header indicating success, updateOnSuccess first
+// calls the provided update func. If the update func fails, a 500
+// response is sent, and the status code and body sent by the handler
+// are ignored (all response writes return the update error).
+type updateOnSuccess struct {
+ httpserver.ResponseWriter
+ update func() error
+ sentHeader bool
+ err error
+}
+
+func (uos *updateOnSuccess) Write(p []byte) (int, error) {
+ if uos.err != nil {
+ return 0, uos.err
+ }
+ if !uos.sentHeader {
+ uos.WriteHeader(http.StatusOK)
+ }
+ return uos.ResponseWriter.Write(p)
+}
+
+func (uos *updateOnSuccess) WriteHeader(code int) {
+ if !uos.sentHeader {
+ uos.sentHeader = true
+ if code >= 200 && code < 400 {
+ if uos.err = uos.update(); uos.err != nil {
+ code := http.StatusInternalServerError
+ if err, ok := uos.err.(*arvados.TransactionError); ok {
+ code = err.StatusCode
+ }
+ log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err)
+ http.Error(uos.ResponseWriter, uos.err.Error(), code)
+ return
+ }
+ }
+ }
+ uos.ResponseWriter.WriteHeader(code)
+}
+
var (
+ writeMethod = map[string]bool{
+ "COPY": true,
+ "DELETE": true,
+ "MKCOL": true,
+ "MOVE": true,
+ "PUT": true,
+ "RMCOL": true,
+ }
webdavMethod = map[string]bool{
+ "COPY": true,
+ "DELETE": true,
+ "MKCOL": true,
+ "MOVE": true,
"OPTIONS": true,
"PROPFIND": true,
+ "PUT": true,
+ "RMCOL": true,
}
browserMethod = map[string]bool{
"GET": true,
return
}
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Range")
- w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PROPFIND")
+ w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Max-Age", "86400")
statusCode = http.StatusOK
}
applyContentDispositionHdr(w, r, basename, attachment)
- fs := collection.FileSystem(&arvados.Client{
+ client := &arvados.Client{
APIHost: arv.ApiServer,
AuthToken: arv.ApiToken,
Insecure: arv.ApiInsecure,
- }, kc)
+ }
+ fs, err := collection.FileSystem(client, kc)
+ if err != nil {
+ statusCode, statusText = http.StatusInternalServerError, err.Error()
+ return
+ }
+
+ targetIsPDH := arvadosclient.PDHMatch(targetID)
+ if targetIsPDH && writeMethod[r.Method] {
+ statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error()
+ return
+ }
+
if webdavMethod[r.Method] {
+ if writeMethod[r.Method] {
+ // Save the collection only if/when all
+ // webdav->filesystem operations succeed --
+ // and send a 500 error if the modified
+ // collection can't be saved.
+ w = &updateOnSuccess{
+ ResponseWriter: w,
+ update: func() error {
+ return h.Config.Cache.Update(client, *collection, fs)
+ }}
+ }
h := webdav.Handler{
- Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
- FileSystem: &webdavFS{collfs: fs},
+ Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
+ FileSystem: &webdavFS{
+ collfs: fs,
+ writing: writeMethod[r.Method],
+ },
LockSystem: h.webdavLS,
Logger: func(_ *http.Request, err error) {
- if os.IsNotExist(err) {
- statusCode, statusText = http.StatusNotFound, err.Error()
- } else if err != nil {
- statusCode, statusText = http.StatusInternalServerError, err.Error()
+ if err != nil {
+ log.Printf("error from webdav handler: %q", err)
}
},
}
c.Check(resp.Code, check.Equals, http.StatusOK)
c.Check(resp.Body.String(), check.Equals, "")
c.Check(resp.Header().Get("Access-Control-Allow-Origin"), check.Equals, "*")
- c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "GET, POST, OPTIONS, PROPFIND")
+ c.Check(resp.Header().Get("Access-Control-Allow-Methods"), check.Equals, "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
c.Check(resp.Header().Get("Access-Control-Allow-Headers"), check.Equals, "Authorization, Content-Type, Range")
// Check preflight for a disallowed request
resp = httptest.NewRecorder()
- req.Header.Set("Access-Control-Request-Method", "DELETE")
+ req.Header.Set("Access-Control-Request-Method", "MAKE-COFFEE")
h.ServeHTTP(resp, req)
c.Check(resp.Body.String(), check.Equals, "")
c.Check(resp.Code, check.Equals, http.StatusMethodNotAllowed)
import (
"flag"
+ "fmt"
"log"
"os"
"time"
var (
defaultConfigPath = "/etc/arvados/keep-web/keep-web.yml"
+ version = "dev"
)
// Config specifies server configuration.
dumpConfig := flag.Bool("dump-config", false,
"write current configuration to stdout and exit")
+ getVersion := flag.Bool("version", false,
+ "print version information and exit.")
flag.Usage = usage
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keep-web %s\n", version)
+ return
+ }
+
if err := config.LoadFile(cfg, configPath); err != nil {
if h := os.Getenv("ARVADOS_API_HOST"); h != "" && configPath == defaultConfigPath {
log.Printf("DEPRECATED: Using ARVADOS_API_HOST environment variable. Use config file instead.")
log.Fatal(config.DumpAndExit(cfg))
}
+ log.Printf("keep-web %s started", version)
+
os.Setenv("ARVADOS_API_HOST", cfg.Client.APIHost)
srv := &server{Config: cfg}
if err := srv.Start(); err != nil {
err := json.NewDecoder(resp.Body).Decode(&status)
c.Check(err, check.IsNil)
c.Check(status["Cache.Requests"], check.Equals, float64(0))
+ c.Check(status["Version"], check.Not(check.Equals), "")
}
func (s *IntegrationSuite) TestNoStatusFromVHost(c *check.C) {
"errors"
"fmt"
prand "math/rand"
- "net/http"
"os"
- "sync"
+ "path"
+ "strings"
"sync/atomic"
"time"
errReadOnly = errors.New("read-only filesystem")
)
-// webdavFS implements a read-only webdav.FileSystem by wrapping an
+// webdavFS implements a webdav.FileSystem by wrapping an
// arvados.CollectionFilesystem.
+//
+// Collections don't preserve empty directories, so Mkdir is
+// effectively a no-op, and we need to make parent dirs spring into
+// existence automatically so sequences like "mkcol foo; put foo/bar"
+// work as expected.
type webdavFS struct {
- collfs arvados.CollectionFileSystem
+ collfs arvados.CollectionFileSystem
+ writing bool
}
-var _ webdav.FileSystem = &webdavFS{}
+func (fs *webdavFS) makeparents(name string) {
+ dir, name := path.Split(name)
+ if dir == "" || dir == "/" {
+ return
+ }
+ dir = dir[:len(dir)-1]
+ fs.makeparents(dir)
+ fs.collfs.Mkdir(dir, 0755)
+}
func (fs *webdavFS) Mkdir(ctx context.Context, name string, perm os.FileMode) error {
- return errReadOnly
+ if !fs.writing {
+ return errReadOnly
+ }
+ name = strings.TrimRight(name, "/")
+ fs.makeparents(name)
+ return fs.collfs.Mkdir(name, 0755)
}
-func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (webdav.File, error) {
- fi, err := fs.collfs.Stat(name)
- if err != nil {
- return nil, err
+func (fs *webdavFS) OpenFile(ctx context.Context, name string, flag int, perm os.FileMode) (f webdav.File, err error) {
+ writing := flag&(os.O_WRONLY|os.O_RDWR) != 0
+ if writing {
+ fs.makeparents(name)
+ }
+ f, err = fs.collfs.OpenFile(name, flag, perm)
+ if !fs.writing {
+ // webdav module returns 404 on all OpenFile errors,
+ // but returns 405 Method Not Allowed if OpenFile()
+ // succeeds but Write() or Close() fails. We'd rather
+ // have 405.
+ f = writeFailer{File: f, err: errReadOnly}
}
- return &webdavFile{collfs: fs.collfs, fileInfo: fi, name: name}, nil
+ return
}
func (fs *webdavFS) RemoveAll(ctx context.Context, name string) error {
- return errReadOnly
+ return fs.collfs.RemoveAll(name)
}
func (fs *webdavFS) Rename(ctx context.Context, oldName, newName string) error {
- return errReadOnly
-}
-
-func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
- return fs.collfs.Stat(name)
-}
-
-// webdavFile implements a read-only webdav.File by wrapping
-// http.File.
-//
-// The http.File is opened from an arvados.CollectionFileSystem, but
-// not until Seek, Read, or Readdir is called. This deferred-open
-// strategy makes webdav's OpenFile-Stat-Close cycle fast even though
-// the collfs's Open method is slow. This is relevant because webdav
-// does OpenFile-Stat-Close on each file when preparing directory
-// listings.
-//
-// Writes to a webdavFile always fail.
-type webdavFile struct {
- // fields populated by (*webdavFS).OpenFile()
- collfs http.FileSystem
- fileInfo os.FileInfo
- name string
-
- // internal fields
- file http.File
- loadOnce sync.Once
- err error
-}
-
-func (f *webdavFile) load() {
- f.file, f.err = f.collfs.Open(f.name)
-}
-
-func (f *webdavFile) Write([]byte) (int, error) {
- return 0, errReadOnly
-}
-
-func (f *webdavFile) Seek(offset int64, whence int) (int64, error) {
- f.loadOnce.Do(f.load)
- if f.err != nil {
- return 0, f.err
+ if !fs.writing {
+ return errReadOnly
}
- return f.file.Seek(offset, whence)
+ fs.makeparents(newName)
+ return fs.collfs.Rename(oldName, newName)
}
-func (f *webdavFile) Read(buf []byte) (int, error) {
- f.loadOnce.Do(f.load)
- if f.err != nil {
- return 0, f.err
+func (fs *webdavFS) Stat(ctx context.Context, name string) (os.FileInfo, error) {
+ if fs.writing {
+ fs.makeparents(name)
}
- return f.file.Read(buf)
+ return fs.collfs.Stat(name)
}
-func (f *webdavFile) Close() error {
- if f.file == nil {
- // We never called load(), or load() failed
- return f.err
- }
- return f.file.Close()
+type writeFailer struct {
+ webdav.File
+ err error
}
-func (f *webdavFile) Readdir(n int) ([]os.FileInfo, error) {
- f.loadOnce.Do(f.load)
- if f.err != nil {
- return nil, f.err
- }
- return f.file.Readdir(n)
+func (wf writeFailer) Write([]byte) (int, error) {
+ return 0, wf.err
}
-func (f *webdavFile) Stat() (os.FileInfo, error) {
- return f.fileInfo, nil
+func (wf writeFailer) Close() error {
+ return wf.err
}
// noLockSystem implements webdav.LockSystem by returning success for
--- /dev/null
+package main
+
+import "golang.org/x/net/webdav"
+
+var _ webdav.FileSystem = &webdavFS{}
"fmt"
"io"
"io/ioutil"
- "log"
"net"
"net/http"
"os"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/health"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ log "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
"github.com/gorilla/mux"
)
+var version = "dev"
+
type Config struct {
Client arvados.Client
Listen string
router http.Handler
)
+const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
func main() {
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: rfc3339NanoFixed,
+ })
+
cfg := DefaultConfig()
flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
flagset.StringVar(&cfgPath, "config", defaultCfgPath, "Configuration file `path`")
dumpConfig := flagset.Bool("dump-config", false, "write current configuration to stdout and exit")
+ getVersion := flagset.Bool("version", false, "Print version information and exit.")
flagset.Parse(os.Args[1:])
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keepproxy %s\n", version)
+ return
+ }
+
err := config.LoadFile(cfg, cfgPath)
if err != nil {
h := os.Getenv("ARVADOS_API_HOST")
log.Fatal(config.DumpAndExit(cfg))
}
+ log.Printf("keepproxy %s started", version)
+
arv, err := arvadosclient.New(&cfg.Client)
if err != nil {
log.Fatalf("Error setting up arvados client %s", err.Error())
// Start serving requests.
router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
- http.Serve(listener, router)
+ http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
log.Println("shutting down")
}
Timeout: h.timeout,
Transport: h.transport,
},
- proto: req.Proto,
+ proto: req.Proto,
+ requestID: req.Header.Get("X-Request-Id"),
}
return &kc
}
"errors"
"fmt"
"io/ioutil"
- "log"
"math/rand"
"net/http"
"net/http/httptest"
time.Sleep(ms * time.Millisecond)
}
if listener == nil {
- log.Fatalf("Timed out waiting for listener to start")
+ panic("Timed out waiting for listener to start")
}
}
{
_, _, err := kc.Ask(hash)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Ask (expected BlockNotFound)")
+ c.Log("Finished Ask (expected BlockNotFound)")
}
{
reader, _, _, err := kc.Get(hash)
c.Check(reader, Equals, nil)
c.Check(err, Equals, keepclient.BlockNotFound)
- log.Print("Finished Get (expected BlockNotFound)")
+ c.Log("Finished Get (expected BlockNotFound)")
}
// Note in bug #5309 among other errors keepproxy would set
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB (expected success)")
+ c.Log("Finished PutB (expected success)")
}
{
blocklen, _, err := kc.Ask(hash2)
c.Assert(err, Equals, nil)
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Ask (expected success)")
+ c.Log("Finished Ask (expected success)")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
- log.Print("Finished Get (expected success)")
+ c.Log("Finished Get (expected success)")
}
{
c.Check(hash2, Matches, `^d41d8cd98f00b204e9800998ecf8427e\+0(\+.+)?$`)
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("Finished PutB zero block")
+ c.Log("Finished PutB zero block")
}
{
all, err := ioutil.ReadAll(reader)
c.Check(all, DeepEquals, []byte(""))
c.Check(blocklen, Equals, int64(0))
- log.Print("Finished Get zero block")
+ c.Log("Finished Get zero block")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Equals, "")
c.Check(rep, Equals, 0)
c.Check(err, FitsTypeOf, keepclient.InsufficientReplicasError(errors.New("")))
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 403"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
errNotFound, _ := err.(keepclient.ErrNotFound)
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
- log.Print("Ask 1")
+ c.Log("Ask 1")
}
{
c.Check(hash2, Matches, fmt.Sprintf(`^%s\+3(\+.+)?$`, hash))
c.Check(rep, Equals, 2)
c.Check(err, Equals, nil)
- log.Print("PutB")
+ c.Log("PutB")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Ask 2")
+ c.Log("Ask 2")
}
{
c.Check(errNotFound, NotNil)
c.Assert(strings.Contains(err.Error(), "HTTP 400"), Equals, true)
c.Check(blocklen, Equals, int64(0))
- log.Print("Get")
+ c.Log("Get")
}
}
var viaAlias = "keepproxy"
type proxyClient struct {
- client keepclient.HTTPClient
- proto string
+ client keepclient.HTTPClient
+ proto string
+ requestID string
}
func (pc *proxyClient) Do(req *http.Request) (*http.Response, error) {
req.Header.Add("Via", pc.proto+" "+viaAlias)
+ req.Header.Add("X-Request-Id", pc.requestID)
return pc.client.Do(req)
}
ok := make(chan struct{})
go func() {
req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
- (&LoggingRESTRouter{router: MakeRESTRouter()}).ServeHTTP(resp, req)
+ MakeRESTRouter().ServeHTTP(resp, req)
ok <- struct{}{}
}()
TrashQueue WorkQueueStatus
RequestsCurrent int
RequestsMax int
+ Version string
}
var st NodeStatus
// populate the given NodeStatus struct with current values.
func (rtr *router) readNodeStatus(st *NodeStatus) {
+ st.Version = version
vols := KeepVM.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*volumeStatusEnt, len(vols))
"github.com/coreos/go-systemd/daemon"
)
+var version = "dev"
+
// A Keep "block" is 64MB.
const BlockSize = 64 * 1024 * 1024
deprecated.beforeFlagParse(theConfig)
dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
var configPath string
flag.Usage = usage
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keepstore %s\n", version)
+ return
+ }
+
deprecated.afterFlagParse(theConfig)
err := config.LoadFile(theConfig, configPath)
log.Fatal(config.DumpAndExit(theConfig))
}
+ log.Printf("keepstore %s started", version)
+
err = theConfig.Start()
if err != nil {
log.Fatal(err)
// Start a round-robin VolumeManager with the volumes we have found.
KeepVM = MakeRRVolumeManager(theConfig.Volumes)
- // Middleware stack: logger, MaxRequests limiter, method handlers
+ // Middleware/handler stack
router := MakeRESTRouter()
limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
router.limiter = limiter
- http.Handle("/", &LoggingRESTRouter{router: limiter})
+ http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
// Set up a TCP listener.
listener, err := net.Listen("tcp", theConfig.Listen)
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// LoggingRESTRouter
-// LoggingResponseWriter
-
-import (
- "context"
- "net/http"
- "strings"
- "time"
-
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- "git.curoverse.com/arvados.git/sdk/go/stats"
- log "github.com/Sirupsen/logrus"
-)
-
-// LoggingResponseWriter has anonymous fields ResponseWriter and ResponseBody
-type LoggingResponseWriter struct {
- Status int
- Length int
- http.ResponseWriter
- ResponseBody string
- sentHdr time.Time
-}
-
-// CloseNotify implements http.CloseNotifier.
-func (resp *LoggingResponseWriter) CloseNotify() <-chan bool {
- wrapped, ok := resp.ResponseWriter.(http.CloseNotifier)
- if !ok {
- // If upstream doesn't implement CloseNotifier, we can
- // satisfy the interface by returning a channel that
- // never sends anything (the interface doesn't
- // guarantee that anything will ever be sent on the
- // channel even if the client disconnects).
- return nil
- }
- return wrapped.CloseNotify()
-}
-
-// WriteHeader writes header to ResponseWriter
-func (resp *LoggingResponseWriter) WriteHeader(code int) {
- if resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Status = code
- resp.ResponseWriter.WriteHeader(code)
-}
-
-var zeroTime time.Time
-
-func (resp *LoggingResponseWriter) Write(data []byte) (int, error) {
- if resp.Length == 0 && len(data) > 0 && resp.sentHdr == zeroTime {
- resp.sentHdr = time.Now()
- }
- resp.Length += len(data)
- if resp.Status >= 400 {
- resp.ResponseBody += string(data)
- }
- return resp.ResponseWriter.Write(data)
-}
-
-// LoggingRESTRouter is used to add logging capabilities to mux.Router
-type LoggingRESTRouter struct {
- router http.Handler
- idGenerator httpserver.IDGenerator
-}
-
-func (loggingRouter *LoggingRESTRouter) ServeHTTP(wrappedResp http.ResponseWriter, req *http.Request) {
- tStart := time.Now()
-
- // Attach a requestID-aware logger to the request context.
- lgr := log.WithField("RequestID", loggingRouter.idGenerator.Next())
- ctx := context.WithValue(req.Context(), "logger", lgr)
- req = req.WithContext(ctx)
-
- lgr = lgr.WithFields(log.Fields{
- "remoteAddr": req.RemoteAddr,
- "reqForwardedFor": req.Header.Get("X-Forwarded-For"),
- "reqMethod": req.Method,
- "reqPath": req.URL.Path[1:],
- "reqBytes": req.ContentLength,
- })
- lgr.Debug("request")
-
- resp := LoggingResponseWriter{http.StatusOK, 0, wrappedResp, "", zeroTime}
- loggingRouter.router.ServeHTTP(&resp, req)
- tDone := time.Now()
-
- statusText := http.StatusText(resp.Status)
- if resp.Status >= 400 {
- statusText = strings.Replace(resp.ResponseBody, "\n", "", -1)
- }
- if resp.sentHdr == zeroTime {
- // Nobody changed status or wrote any data, i.e., we
- // returned a 200 response with no body.
- resp.sentHdr = tDone
- }
-
- lgr.WithFields(log.Fields{
- "timeTotal": stats.Duration(tDone.Sub(tStart)),
- "timeToStatus": stats.Duration(resp.sentHdr.Sub(tStart)),
- "timeWriteBody": stats.Duration(tDone.Sub(resp.sentHdr)),
- "respStatusCode": resp.Status,
- "respStatus": statusText,
- "respBytes": resp.Length,
- }).Info("response")
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "net/http"
- "testing"
-)
-
-func TestLoggingResponseWriterImplementsCloseNotifier(t *testing.T) {
- http.ResponseWriter(&LoggingResponseWriter{}).(http.CloseNotifier).CloseNotify()
-}
c.Check(getStatusItem("PullQueue", "InProgress"), Equals, float64(0))
c.Check(getStatusItem("PullQueue", "Queued"), Equals, float64(0))
+ c.Check(getStatusItem("Version"), Not(Equals), "")
response := IssueRequest(&testData.req)
c.Assert(response.Code, Equals, testData.responseCode)
s3RaceWindow time.Duration
s3ACL = s3.Private
+
+ zeroTime time.Time
)
const (
try:
root_logger = setup_logging(config.get('Logging', 'file'), **config.log_levels())
- root_logger.info("%s %s, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
+ root_logger.info("%s %s started, libcloud %s", sys.argv[0], __version__, libcloud.__version__)
node_setup, node_shutdown, node_update, node_monitor = \
config.dispatch_classes()
server_calculator = build_server_calculator(config)
import socketserver
import threading
+from ._version import __version__
+
_logger = logging.getLogger('status.Handler')
def __init__(self):
self._mtx = threading.Lock()
self._latest = {}
+ self._version = {'Version' : __version__}
def get_json(self):
with self._mtx:
- return json.dumps(self._latest)
+ return json.dumps(dict(self._latest, **self._version))
def keys(self):
with self._mtx:
resp = r.json()
self.assertEqual(n, resp['nodes_'+str(n)])
self.assertEqual(1, resp['nodes_1'])
+ self.assertIn('Version', resp)
class StatusServerDisabled(unittest.TestCase):
)
var logger = ctxlog.FromContext
+var version = "dev"
func main() {
log := logger(nil)
configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
+ getVersion := flag.Bool("version", false, "Print version information and exit.")
cfg := defaultConfig()
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("arvados-ws %s\n", version)
+ return
+ }
+
err := config.LoadFile(&cfg, *configPath)
if err != nil {
log.Fatal(err)
return
}
- log.Info("started")
+ log.Printf("arvados-ws %s started", version)
srv := &server{wsConfig: &cfg}
log.Fatal(srv.Run())
}
func (rtr *router) Status() interface{} {
return map[string]interface{}{
"Clients": atomic.LoadInt64(&rtr.status.ReqsActive),
+ "Version": version,
}
}
package main
import (
+ "encoding/json"
"io/ioutil"
"net/http"
"sync"
}
}
+func (s *serverSuite) TestStatus(c *check.C) {
+ go s.srv.Run()
+ defer s.srv.Close()
+ s.srv.WaitReady()
+ req, err := http.NewRequest("GET", "http://"+s.srv.listener.Addr().String()+"/status.json", nil)
+ c.Assert(err, check.IsNil)
+ resp, err := http.DefaultClient.Do(req)
+ c.Check(err, check.IsNil)
+ c.Check(resp.StatusCode, check.Equals, http.StatusOK)
+ var status map[string]interface{}
+ err = json.NewDecoder(resp.Body).Decode(&status)
+ c.Check(err, check.IsNil)
+ c.Check(status["Version"], check.Not(check.Equals), "")
+}
+
func (s *serverSuite) TestHealthDisabled(c *check.C) {
s.cfg.ManagementToken = ""
"git.curoverse.com/arvados.git/sdk/go/arvados"
)
+var version = "dev"
+
type resourceList interface {
Len() int
GetItems() []interface{}
"verbose",
false,
"Log informational messages. Off by default.")
+ getVersion := flags.Bool(
+ "version",
+ false,
+ "Print version information and exit.")
parentGroupUUID := flags.String(
"parent-group-uuid",
"",
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("arv-sync-groups %s\n", version)
+ os.Exit(0)
+ }
+
// Input file as a required positional argument
if flags.NArg() == 0 {
return fmt.Errorf("please provide a path to an input file")
}
defer f.Close()
- log.Printf("Group sync starting. Using %q as users id and parent group UUID %q", cfg.UserID, cfg.ParentGroupUUID)
+ log.Printf("arv-sync-groups %s started. Using %q as users id and parent group UUID %q", version, cfg.UserID, cfg.ParentGroupUUID)
// Get the complete user list to minimize API Server requests
allUsers := make(map[string]arvados.User)
include agpl-3.0.txt
include crunchstat_summary/dygraphs.js
-include crunchstat_summary/chartjs.js
+include crunchstat_summary/synchronizer.js
});
});
+ var sync = Dygraph.synchronize(Object.values(charts), {range: false});
+
if (typeof window.debug === 'undefined')
window.debug = {};
window.debug.charts = charts;
class DygraphsChart(crunchstat_summary.webchart.WebChart):
CSS = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.css'
JSLIB = 'https://cdnjs.cloudflare.com/ajax/libs/dygraph/2.0.0/dygraph.min.js'
- JSASSET = 'dygraphs.js'
+ JSASSETS = ['synchronizer.js','dygraphs.js']
def headHTML(self):
return '<link rel="stylesheet" href="{}">\n'.format(self.CSS)
--- /dev/null
+/**
+ * Synchronize zooming and/or selections between a set of dygraphs.
+ *
+ * Usage:
+ *
+ * var g1 = new Dygraph(...),
+ * g2 = new Dygraph(...),
+ * ...;
+ * var sync = Dygraph.synchronize(g1, g2, ...);
+ * // charts are now synchronized
+ * sync.detach();
+ * // charts are no longer synchronized
+ *
+ * You can set options using the last parameter, for example:
+ *
+ * var sync = Dygraph.synchronize(g1, g2, g3, {
+ * selection: true,
+ * zoom: true
+ * });
+ *
+ * The default is to synchronize both of these.
+ *
+ * Instead of passing one Dygraph object as each parameter, you may also pass an
+ * array of dygraphs:
+ *
+ * var sync = Dygraph.synchronize([g1, g2, g3], {
+ * selection: false,
+ * zoom: true
+ * });
+ *
+ * You may also set `range: false` if you wish to only sync the x-axis.
+ * The `range` option has no effect unless `zoom` is true (the default).
+ *
+ * SPDX-License-Identifier: MIT
+ * Original source: https://github.com/danvk/dygraphs/blob/master/src/extras/synchronizer.js
+ * at commit b55a71d768d2f8de62877c32b3aec9e9975ac389
+ *
+ * Copyright (c) 2009 Dan Vanderkam
+ *
+ * Permission is hereby granted, free of charge, to any person
+ * obtaining a copy of this software and associated documentation
+ * files (the "Software"), to deal in the Software without
+ * restriction, including without limitation the rights to use,
+ * copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following
+ * conditions:
+ *
+ * The above copyright notice and this permission notice shall be
+ * included in all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
+ * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
+ * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+(function() {
+/* global Dygraph:false */
+'use strict';
+
+var Dygraph;
+if (window.Dygraph) {
+ Dygraph = window.Dygraph;
+} else if (typeof(module) !== 'undefined') {
+ Dygraph = require('../dygraph');
+}
+
+var synchronize = function(/* dygraphs..., opts */) {
+ if (arguments.length === 0) {
+ throw 'Invalid invocation of Dygraph.synchronize(). Need >= 1 argument.';
+ }
+
+ var OPTIONS = ['selection', 'zoom', 'range'];
+ var opts = {
+ selection: true,
+ zoom: true,
+ range: true
+ };
+ var dygraphs = [];
+ var prevCallbacks = [];
+
+ var parseOpts = function(obj) {
+ if (!(obj instanceof Object)) {
+ throw 'Last argument must be either Dygraph or Object.';
+ } else {
+ for (var i = 0; i < OPTIONS.length; i++) {
+ var optName = OPTIONS[i];
+ if (obj.hasOwnProperty(optName)) opts[optName] = obj[optName];
+ }
+ }
+ };
+
+ if (arguments[0] instanceof Dygraph) {
+ // Arguments are Dygraph objects.
+ for (var i = 0; i < arguments.length; i++) {
+ if (arguments[i] instanceof Dygraph) {
+ dygraphs.push(arguments[i]);
+ } else {
+ break;
+ }
+ }
+ if (i < arguments.length - 1) {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'All but the last argument must be Dygraph objects.';
+ } else if (i == arguments.length - 1) {
+ parseOpts(arguments[arguments.length - 1]);
+ }
+ } else if (arguments[0].length) {
+ // Invoked w/ list of dygraphs, options
+ for (var i = 0; i < arguments[0].length; i++) {
+ dygraphs.push(arguments[0][i]);
+ }
+ if (arguments.length == 2) {
+ parseOpts(arguments[1]);
+ } else if (arguments.length > 2) {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'Expected two arguments: array and optional options argument.';
+ } // otherwise arguments.length == 1, which is fine.
+ } else {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'First parameter must be either Dygraph or list of Dygraphs.';
+ }
+
+ if (dygraphs.length < 2) {
+ throw 'Invalid invocation of Dygraph.synchronize(). ' +
+ 'Need two or more dygraphs to synchronize.';
+ }
+
+ var readycount = dygraphs.length;
+ for (var i = 0; i < dygraphs.length; i++) {
+ var g = dygraphs[i];
+ g.ready( function() {
+ if (--readycount == 0) {
+ // store original callbacks
+ var callBackTypes = ['drawCallback', 'highlightCallback', 'unhighlightCallback'];
+ for (var j = 0; j < dygraphs.length; j++) {
+ if (!prevCallbacks[j]) {
+ prevCallbacks[j] = {};
+ }
+ for (var k = callBackTypes.length - 1; k >= 0; k--) {
+ prevCallbacks[j][callBackTypes[k]] = dygraphs[j].getFunctionOption(callBackTypes[k]);
+ }
+ }
+
+ // Listen for draw, highlight, unhighlight callbacks.
+ if (opts.zoom) {
+ attachZoomHandlers(dygraphs, opts, prevCallbacks);
+ }
+
+ if (opts.selection) {
+ attachSelectionHandlers(dygraphs, prevCallbacks);
+ }
+ }
+ });
+ }
+
+ return {
+ detach: function() {
+ for (var i = 0; i < dygraphs.length; i++) {
+ var g = dygraphs[i];
+ if (opts.zoom) {
+ g.updateOptions({drawCallback: prevCallbacks[i].drawCallback});
+ }
+ if (opts.selection) {
+ g.updateOptions({
+ highlightCallback: prevCallbacks[i].highlightCallback,
+ unhighlightCallback: prevCallbacks[i].unhighlightCallback
+ });
+ }
+ }
+ // release references & make subsequent calls throw.
+ dygraphs = null;
+ opts = null;
+ prevCallbacks = null;
+ }
+ };
+};
+
+function arraysAreEqual(a, b) {
+ if (!Array.isArray(a) || !Array.isArray(b)) return false;
+ var i = a.length;
+ if (i !== b.length) return false;
+ while (i--) {
+ if (a[i] !== b[i]) return false;
+ }
+ return true;
+}
+
+function attachZoomHandlers(gs, syncOpts, prevCallbacks) {
+ var block = false;
+ for (var i = 0; i < gs.length; i++) {
+ var g = gs[i];
+ g.updateOptions({
+ drawCallback: function(me, initial) {
+ if (block || initial) return;
+ block = true;
+ var opts = {
+ dateWindow: me.xAxisRange()
+ };
+ if (syncOpts.range) opts.valueRange = me.yAxisRange();
+
+ for (var j = 0; j < gs.length; j++) {
+ if (gs[j] == me) {
+ if (prevCallbacks[j] && prevCallbacks[j].drawCallback) {
+ prevCallbacks[j].drawCallback.apply(this, arguments);
+ }
+ continue;
+ }
+
+ // Only redraw if there are new options
+ if (arraysAreEqual(opts.dateWindow, gs[j].getOption('dateWindow')) &&
+ arraysAreEqual(opts.valueRange, gs[j].getOption('valueRange'))) {
+ continue;
+ }
+
+ gs[j].updateOptions(opts);
+ }
+ block = false;
+ }
+ }, true /* no need to redraw */);
+ }
+}
+
+function attachSelectionHandlers(gs, prevCallbacks) {
+ var block = false;
+ for (var i = 0; i < gs.length; i++) {
+ var g = gs[i];
+
+ g.updateOptions({
+ highlightCallback: function(event, x, points, row, seriesName) {
+ if (block) return;
+ block = true;
+ var me = this;
+ for (var i = 0; i < gs.length; i++) {
+ if (me == gs[i]) {
+ if (prevCallbacks[i] && prevCallbacks[i].highlightCallback) {
+ prevCallbacks[i].highlightCallback.apply(this, arguments);
+ }
+ continue;
+ }
+ var idx = gs[i].getRowForX(x);
+ if (idx !== null) {
+ gs[i].setSelection(idx, seriesName);
+ }
+ }
+ block = false;
+ },
+ unhighlightCallback: function(event) {
+ if (block) return;
+ block = true;
+ var me = this;
+ for (var i = 0; i < gs.length; i++) {
+ if (me == gs[i]) {
+ if (prevCallbacks[i] && prevCallbacks[i].unhighlightCallback) {
+ prevCallbacks[i].unhighlightCallback.apply(this, arguments);
+ }
+ continue;
+ }
+ gs[i].clearSelection();
+ }
+ block = false;
+ }
+ }, true /* no need to redraw */);
+ }
+}
+
+Dygraph.synchronize = synchronize;
+
+})();
class WebChart(object):
"""Base class for a web chart.
- Subclasses must assign JSLIB and JSASSET, and override the
+ Subclasses must assign JSLIB and JSASSETS, and override the
chartdata() method.
"""
JSLIB = None
def js(self):
return 'var chartdata = {};\n{}'.format(
json.dumps(self.sections()),
- pkg_resources.resource_string('crunchstat_summary', self.JSASSET))
+ '\n'.join([pkg_resources.resource_string('crunchstat_summary', jsa) for jsa in self.JSASSETS]))
def sections(self):
return [
"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
+var version = "dev"
+
func main() {
err := doMain(os.Args[1:])
if err != nil {
false,
"Log progress of each block verification")
+ getVersion := flags.Bool(
+ "version",
+ false,
+ "Print version information and exit.")
+
// Parse args; omit the first arg which is the command name
flags.Parse(args)
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keep-block-check %s\n", version)
+ os.Exit(0)
+ }
+
config, blobSigningKey, err := loadConfig(*configFile)
if err != nil {
return fmt.Errorf("Error loading configuration from file: %s", err.Error())
"crypto/rand"
"encoding/binary"
"flag"
+ "fmt"
"io"
"io/ioutil"
"log"
"net/http"
+ "os"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
+var version = "dev"
+
// Command line config knobs
var (
BlockSize = flag.Int("block-size", keepclient.BLOCKSIZE, "bytes per read/write op")
StatsInterval = flag.Duration("stats-interval", time.Second, "time interval between IO stats reports, or 0 to disable")
ServiceURL = flag.String("url", "", "specify scheme://host of a single keep service to exercise (instead of using all advertised services like normal clients)")
ServiceUUID = flag.String("uuid", "", "specify UUID of a single advertised keep service to exercise")
+ getVersion = flag.Bool("version", false, "Print version information and exit.")
)
func main() {
flag.Parse()
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keep-exercise %s\n", version)
+ os.Exit(0)
+ }
+
+ log.Printf("keep-exercise %s started", version)
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Fatal(err)
"git.curoverse.com/arvados.git/sdk/go/keepclient"
)
+var version = "dev"
+
func main() {
err := doMain()
if err != nil {
0,
"Lifetime of blob permission signatures on source keepservers. If not provided, this will be retrieved from the API server's discovery document.")
+ getVersion := flags.Bool(
+ "version",
+ false,
+ "Print version information and exit.")
+
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
+ // Print version information if requested
+ if *getVersion {
+ fmt.Printf("keep-rsync %s\n", version)
+ os.Exit(0)
+ }
+
srcConfig, srcBlobSigningKey, err := loadConfig(*srcConfigFile)
if err != nil {
return fmt.Errorf("Error loading src configuration from file: %s", err.Error())