# Approximate memory limit (in bytes) for collection cache.
MaxCollectionBytes: 100000000
- # Permission cache entries.
- MaxPermissionEntries: 1000
-
# UUID cache entries.
MaxUUIDEntries: 1000
UUIDTTL *arvados.Duration
MaxCollectionEntries *int
MaxCollectionBytes *int64
- MaxPermissionEntries *int
MaxUUIDEntries *int
}
if oc.Cache.MaxCollectionBytes != nil {
cluster.Collections.WebDAVCache.MaxCollectionBytes = *oc.Cache.MaxCollectionBytes
}
- if oc.Cache.MaxPermissionEntries != nil {
- cluster.Collections.WebDAVCache.MaxPermissionEntries = *oc.Cache.MaxPermissionEntries
- }
if oc.Cache.MaxUUIDEntries != nil {
cluster.Collections.WebDAVCache.MaxUUIDEntries = *oc.Cache.MaxUUIDEntries
}
"UUIDTTL": "1s",
"MaxCollectionEntries": 42,
"MaxCollectionBytes": 1234567890,
- "MaxPermissionEntries": 100,
"MaxUUIDEntries": 100
},
"ManagementToken": "xyzzy"
c.Check(cluster.Collections.WebDAVCache.UUIDTTL, check.Equals, arvados.Duration(time.Second))
c.Check(cluster.Collections.WebDAVCache.MaxCollectionEntries, check.Equals, 42)
c.Check(cluster.Collections.WebDAVCache.MaxCollectionBytes, check.Equals, int64(1234567890))
- c.Check(cluster.Collections.WebDAVCache.MaxPermissionEntries, check.Equals, 100)
c.Check(cluster.Collections.WebDAVCache.MaxUUIDEntries, check.Equals, 100)
c.Check(cluster.Services.WebDAVDownload.ExternalURL, check.Equals, arvados.URL{Host: "download.example.com", Path: "/"})
# Approximate memory limit (in bytes) for collection cache.
MaxCollectionBytes: 100000000
- # Permission cache entries.
- MaxPermissionEntries: 1000
-
# UUID cache entries.
MaxUUIDEntries: 1000
logger.Trace("overquota")
overquota = sorted[i:]
break tryrun
- } else if logger.Info("creating new instance"); sch.pool.Create(it) {
+ } else if sch.pool.Create(it) {
// Success. (Note pool.Create works
// asynchronously and does its own
- // logging, so we don't need to.)
+ // logging about the eventual outcome,
+ // so we don't need to.)
+ logger.Info("creating new instance")
} else {
// Failed despite not being at quota,
// e.g., cloud ops throttled. TODO:
def update_pipeline_component(self, r):
pass
+ def _required_env(self):
+ env = {}
+ env["HOME"] = self.outdir
+ env["TMPDIR"] = self.tmpdir
+ return env
+
def run(self, runtimeContext):
# ArvadosCommandTool subclasses from cwltool.CommandLineTool,
# which calls makeJobRunner() to get a new ArvadosContainer
"path": "%s/%s" % (self.outdir, self.stdout)}
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
- if not docker_req:
- docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
from .runner import make_builder
+from ._version import __version__
from functools import partial
from schema_salad.sourceline import SourceLine
from cwltool.errors import WorkflowException
def __init__(self, arvrunner, toolpath_object, loadingContext):
super(ArvadosCommandTool, self).__init__(toolpath_object, loadingContext)
+
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+ if not docker_req:
+ self.hints.append({"class": "DockerRequirement",
+ "dockerImageId": "arvados/jobs:"+__version__})
+
self.arvrunner = arvrunner
def make_job_runner(self, runtimeContext):
if self.submit_request_uuid:
self.submit_runner_cluster = self.submit_request_uuid[0:5]
+
+ def get_outdir(self) -> str:
+ """Return self.outdir or create one with self.tmp_outdir_prefix."""
+ return self.outdir
+
+ def get_tmpdir(self) -> str:
+ """Return self.tmpdir or create one with self.tmpdir_prefix."""
+ return self.tmpdir
+
+ def create_tmpdir(self) -> str:
+ """Return self.tmpdir or create one with self.tmpdir_prefix."""
+ return self.tmpdir
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.0.20210319143721',
- 'schema-salad==7.1.20210611090601',
+ 'cwltool==3.1.20210816212154',
+ 'schema-salad==8.2.20210902094147',
'arvados-python-client{}'.format(pysdk_dep),
'setuptools',
'ciso8601 >= 2.0.0',
'networkx < 2.6'
],
- extras_require={
- ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
- ':python_version<"3"': ['pytz'],
- },
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
],
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner, enable_reuse)
}],
"baseCommand": "ls",
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
}],
"baseCommand": "ls",
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
"stdin": "/keep/99999999999999999999999999999996+99/file.txt",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({"arguments": ["md5sum", "example.conf"],
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "http://commonwl.org/cwltool#Secrets",
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "ToolTimeLimit",
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "http://arvados.org/cwl#OutputStorageClass",
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "http://arvados.org/cwl#ProcessProperties",
MaxBlockEntries int
MaxCollectionEntries int
MaxCollectionBytes int64
- MaxPermissionEntries int
MaxUUIDEntries int
MaxSessions int
}
package arvados
import (
+ "bytes"
"context"
"encoding/json"
"fmt"
}
func (dn *dirnode) loadManifest(txt string) error {
- var dirname string
- streams := strings.Split(txt, "\n")
- if streams[len(streams)-1] != "" {
+ streams := bytes.Split([]byte(txt), []byte{'\n'})
+ if len(streams[len(streams)-1]) != 0 {
return fmt.Errorf("line %d: no trailing newline", len(streams))
}
streams = streams[:len(streams)-1]
segments := []storedSegment{}
+ // To reduce allocs, we reuse a single "pathparts" slice
+ // (pre-split on "/" separators) for the duration of this
+ // func.
+ var pathparts []string
+ // To reduce allocs, we reuse a single "toks" slice of 3 byte
+ // slices.
+ var toks = make([][]byte, 3)
+ // Similar to bytes.SplitN(token, []byte{c}, 3), but splits
+ // into the toks slice rather than allocating a new one, and
+ // returns the number of toks (1, 2, or 3).
+ splitToToks := func(src []byte, c rune) int {
+ c1 := bytes.IndexRune(src, c)
+ if c1 < 0 {
+ toks[0] = src
+ return 1
+ }
+ toks[0], src = src[:c1], src[c1+1:]
+ c2 := bytes.IndexRune(src, c)
+ if c2 < 0 {
+ toks[1] = src
+ return 2
+ }
+ toks[1], toks[2] = src[:c2], src[c2+1:]
+ return 3
+ }
for i, stream := range streams {
lineno := i + 1
var anyFileTokens bool
var pos int64
var segIdx int
segments = segments[:0]
- for i, token := range strings.Split(stream, " ") {
+ pathparts = nil
+ streamparts := 0
+ for i, token := range bytes.Split(stream, []byte{' '}) {
if i == 0 {
- dirname = manifestUnescape(token)
+ pathparts = strings.Split(manifestUnescape(string(token)), "/")
+ streamparts = len(pathparts)
continue
}
- if !strings.Contains(token, ":") {
+ if !bytes.ContainsRune(token, ':') {
if anyFileTokens {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- toks := strings.SplitN(token, "+", 3)
- if len(toks) < 2 {
+ if splitToToks(token, '+') < 2 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 32)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 32)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
segments = append(segments, storedSegment{
- locator: token,
+ locator: string(token),
size: int(length),
offset: 0,
length: int(length),
} else if len(segments) == 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
-
- toks := strings.SplitN(token, ":", 3)
- if len(toks) != 3 {
+ if splitToToks(token, ':') != 3 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
anyFileTokens = true
- offset, err := strconv.ParseInt(toks[0], 10, 64)
+ offset, err := strconv.ParseInt(string(toks[0]), 10, 64)
if err != nil || offset < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 64)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 64)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- name := dirname + "/" + manifestUnescape(toks[2])
- fnode, err := dn.createFileAndParents(name)
+ if !bytes.ContainsAny(toks[2], `\/`) {
+ // optimization for a common case
+ pathparts = append(pathparts[:streamparts], string(toks[2]))
+ } else {
+ pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ }
+ fnode, err := dn.createFileAndParents(pathparts)
if fnode == nil && err == nil && length == 0 {
// Special case: an empty file used as
// a marker to preserve an otherwise
continue
}
if err != nil || (fnode == nil && length != 0) {
- return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
+ return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
return fmt.Errorf("line %d: no file segments", lineno)
} else if len(segments) == 0 {
return fmt.Errorf("line %d: no locators", lineno)
- } else if dirname == "" {
+ } else if streamparts == 0 {
return fmt.Errorf("line %d: no stream name", lineno)
}
}
//
// If path is a "parent directory exists" marker (the last path
// component is "."), the returned values are both nil.
-func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+//
+// Newly added nodes have modtime==0. Caller is responsible for fixing
+// them with backdateTree.
+func (dn *dirnode) createFileAndParents(names []string) (fn *filenode, err error) {
var node inode = dn
- names := strings.Split(path, "/")
basename := names[len(names)-1]
for _, name := range names[:len(names)-1] {
switch name {
node = node.Parent()
continue
}
- modtime := node.Parent().FileInfo().ModTime()
node.Lock()
- locked := node
+ unlock := node.Unlock
node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
- child, err := node.FS().newNode(name, 0755|os.ModeDir, modtime)
+ // note modtime will be fixed later in backdateTree()
+ child, err := node.FS().newNode(name, 0755|os.ModeDir, time.Time{})
if err != nil {
return nil, err
}
return child, nil
}
})
- locked.Unlock()
+ unlock()
if err != nil {
return
}
if basename == "." {
return
} else if !permittedName(basename) {
- err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+ err = fmt.Errorf("invalid file part %q in path %q", basename, names)
return
}
- modtime := node.FileInfo().ModTime()
node.Lock()
defer node.Unlock()
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
- child, err = node.FS().newNode(basename, 0755, modtime)
+ child, err = node.FS().newNode(basename, 0755, time.Time{})
if err != nil {
return nil, err
}
}
}
+var bigmanifest = func() string {
+ var buf bytes.Buffer
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, "./dir%d", i)
+ for i := 0; i < 100; i++ {
+ fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
+ }
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
+ }
+ fmt.Fprintf(&buf, "\n")
+ }
+ return buf.String()
+}()
+
+func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
+ DebugLocksPanicMode = false
+ c.Logf("test manifest is %d bytes", len(bigmanifest))
+ for i := 0; i < c.N; i++ {
+ fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
+ c.Check(err, check.IsNil)
+ c.Check(fs, check.NotNil)
+ }
+}
+
func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
fn := f.(*filehandle).inode.(*filenode)
var memsize int64
result = service.last_result()
if not success:
- if result.get('status_code', None):
+ if result.get('status_code'):
_logger.debug("Request fail: PUT %s => %s %s",
self.data_hash,
- result['status_code'],
- result['body'])
+ result.get('status_code'),
+ result.get('body'))
raise self.TaskFailed()
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
install_requires=[
'ciso8601 >=2.0.0',
'future',
- 'google-api-python-client >=1.6.2, <1.7',
+ 'google-api-python-client >=1.6.2, <2',
'httplib2 >=0.9.2',
'pycurl >=7.19.5.1',
- 'ruamel.yaml >=0.15.54, <=0.16.5',
+ 'ruamel.yaml >=0.15.54, <=0.17.11',
'setuptools',
'ws4py >=0.4.2',
],
- extras_require={
- ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
- ':python_version<"3"': ['pytz'],
- },
classifiers=[
'Programming Language :: Python :: 3',
],
def last_result(self):
if self.will_succeed:
return self._result
+ else:
+ return {"status_code": 500, "body": "didn't succeed"}
def finished(self):
return False
"git.arvados.org/arvados.git/sdk/go/keepclient"
lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
)
const metricsUpdateInterval = time.Second / 10
type cache struct {
cluster *arvados.Cluster
config *arvados.WebDAVCacheConfig // TODO: use cluster.Collections.WebDAV instead
+ logger logrus.FieldLogger
registry *prometheus.Registry
metrics cacheMetrics
pdhs *lru.TwoQueueCache
collections *lru.TwoQueueCache
- permissions *lru.TwoQueueCache
sessions *lru.TwoQueueCache
setupOnce sync.Once
+
+ chPruneSessions chan struct{}
+ chPruneCollections chan struct{}
}
type cacheMetrics struct {
sessionEntries prometheus.Gauge
collectionHits prometheus.Counter
pdhHits prometheus.Counter
- permissionHits prometheus.Counter
sessionHits prometheus.Counter
sessionMisses prometheus.Counter
apiCalls prometheus.Counter
Help: "Number of uuid-to-pdh cache hits.",
})
reg.MustRegister(m.pdhHits)
- m.permissionHits = prometheus.NewCounter(prometheus.CounterOpts{
- Namespace: "arvados",
- Subsystem: "keepweb_collectioncache",
- Name: "permission_hits",
- Help: "Number of targetID-to-permission cache hits.",
- })
- reg.MustRegister(m.permissionHits)
m.apiCalls = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "arvados",
Subsystem: "keepweb_collectioncache",
}
type cachedPDH struct {
- expire time.Time
- pdh string
+ expire time.Time
+ refresh time.Time
+ pdh string
}
type cachedCollection struct {
if err != nil {
panic(err)
}
- c.permissions, err = lru.New2Q(c.config.MaxPermissionEntries)
- if err != nil {
- panic(err)
- }
c.sessions, err = lru.New2Q(c.config.MaxSessions)
if err != nil {
panic(err)
c.updateGauges()
}
}()
+ c.chPruneCollections = make(chan struct{}, 1)
+ go func() {
+ for range c.chPruneCollections {
+ c.pruneCollections()
+ }
+ }()
+ c.chPruneSessions = make(chan struct{}, 1)
+ go func() {
+ for range c.chPruneSessions {
+ c.pruneSessions()
+ }
+ }()
}
func (c *cache) updateGauges() {
}
coll.ManifestText = m
var updated arvados.Collection
- defer c.pdhs.Remove(coll.UUID)
err = client.RequestAndDecode(&updated, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
"collection": map[string]string{
"manifest_text": coll.ManifestText,
},
})
- if err == nil {
- c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
- expire: time.Now().Add(time.Duration(c.config.TTL)),
- collection: &updated,
- })
+ if err != nil {
+ c.pdhs.Remove(coll.UUID)
+ return err
}
- return err
+ c.collections.Add(client.AuthToken+"\000"+updated.PortableDataHash, &cachedCollection{
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
+ collection: &updated,
+ })
+ c.pdhs.Add(coll.UUID, &cachedPDH{
+ expire: time.Now().Add(time.Duration(c.config.TTL)),
+ refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
+ pdh: updated.PortableDataHash,
+ })
+ return nil
}
// ResetSession unloads any potentially stale state. Should be called
} else {
c.metrics.sessionHits.Inc()
}
- go c.pruneSessions()
+ select {
+ case c.chPruneSessions <- struct{}{}:
+ default:
+ }
fs, _ := sess.fs.Load().(arvados.CustomFileSystem)
if fs != nil && !expired {
return fs, sess, nil
c.setupOnce.Do(c.setup)
c.metrics.requests.Inc()
- permOK := false
- permKey := arv.ApiToken + "\000" + targetID
- if forceReload {
- } else if ent, cached := c.permissions.Get(permKey); cached {
- ent := ent.(*cachedPermission)
- if ent.expire.Before(time.Now()) {
- c.permissions.Remove(permKey)
- } else {
- permOK = true
- c.metrics.permissionHits.Inc()
- }
- }
-
+ var pdhRefresh bool
var pdh string
if arvadosclient.PDHMatch(targetID) {
pdh = targetID
c.pdhs.Remove(targetID)
} else {
pdh = ent.pdh
+ pdhRefresh = forceReload || time.Now().After(ent.refresh)
c.metrics.pdhHits.Inc()
}
}
- var collection *arvados.Collection
- if pdh != "" {
- collection = c.lookupCollection(arv.ApiToken + "\000" + pdh)
- }
-
- if collection != nil && permOK {
- return collection, nil
- } else if collection != nil {
- // Ask API for current PDH for this targetID. Most
- // likely, the cached PDH is still correct; if so,
- // _and_ the current token has permission, we can
- // use our cached manifest.
+ if pdh == "" {
+ // UUID->PDH mapping is not cached, might as well get
+ // the whole collection record and be done (below).
+ c.logger.Debugf("cache(%s): have no pdh", targetID)
+ } else if cached := c.lookupCollection(arv.ApiToken + "\000" + pdh); cached == nil {
+ // PDH->manifest is not cached, might as well get the
+ // whole collection record (below).
+ c.logger.Debugf("cache(%s): have pdh %s but manifest is not cached", targetID, pdh)
+ } else if !pdhRefresh {
+ // We looked up UUID->PDH very recently, and we still
+ // have the manifest for that PDH.
+ c.logger.Debugf("cache(%s): have pdh %s and refresh not needed", targetID, pdh)
+ return cached, nil
+ } else {
+ // Get current PDH for this UUID (and confirm we still
+ // have read permission). Most likely, the cached PDH
+ // is still correct, in which case we can use our
+ // cached manifest.
c.metrics.apiCalls.Inc()
var current arvados.Collection
err := arv.Get("collections", targetID, selectPDH, ¤t)
return nil, err
}
if current.PortableDataHash == pdh {
- c.permissions.Add(permKey, &cachedPermission{
- expire: time.Now().Add(time.Duration(c.config.TTL)),
- })
- if pdh != targetID {
- c.pdhs.Add(targetID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
- pdh: pdh,
- })
- }
- return collection, err
+ // PDH has not changed, cached manifest is
+ // correct.
+ c.logger.Debugf("cache(%s): verified cached pdh %s is still correct", targetID, pdh)
+ return cached, nil
}
- // PDH changed, but now we know we have
- // permission -- and maybe we already have the
- // new PDH in the cache.
- if coll := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); coll != nil {
- return coll, nil
+ if cached := c.lookupCollection(arv.ApiToken + "\000" + current.PortableDataHash); cached != nil {
+ // PDH changed, and we already have the
+ // manifest for that new PDH.
+ c.logger.Debugf("cache(%s): cached pdh %s was stale, new pdh is %s and manifest is already in cache", targetID, pdh, current.PortableDataHash)
+ return cached, nil
}
}
- // Collection manifest is not cached.
+ // Either UUID->PDH is not cached, or PDH->manifest is not
+ // cached.
+ var retrieved arvados.Collection
c.metrics.apiCalls.Inc()
- err := arv.Get("collections", targetID, nil, &collection)
+ err := arv.Get("collections", targetID, nil, &retrieved)
if err != nil {
return nil, err
}
+ c.logger.Debugf("cache(%s): retrieved manifest, caching with pdh %s", targetID, retrieved.PortableDataHash)
exp := time.Now().Add(time.Duration(c.config.TTL))
- c.permissions.Add(permKey, &cachedPermission{
- expire: exp,
- })
- c.pdhs.Add(targetID, &cachedPDH{
- expire: time.Now().Add(time.Duration(c.config.UUIDTTL)),
- pdh: collection.PortableDataHash,
- })
- c.collections.Add(arv.ApiToken+"\000"+collection.PortableDataHash, &cachedCollection{
+ if targetID != retrieved.PortableDataHash {
+ c.pdhs.Add(targetID, &cachedPDH{
+ expire: exp,
+ refresh: time.Now().Add(time.Duration(c.config.UUIDTTL)),
+ pdh: retrieved.PortableDataHash,
+ })
+ }
+ c.collections.Add(arv.ApiToken+"\000"+retrieved.PortableDataHash, &cachedCollection{
expire: exp,
- collection: collection,
+ collection: &retrieved,
})
- if int64(len(collection.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
- go c.pruneCollections()
+ if int64(len(retrieved.ManifestText)) > c.config.MaxCollectionBytes/int64(c.config.MaxCollectionEntries) {
+ select {
+ case c.chPruneCollections <- struct{}{}:
+ default:
+ }
}
- return collection, nil
+ return &retrieved, nil
}
// pruneCollections checks the total bytes occupied by manifest_text
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/expfmt"
"gopkg.in/check.v1"
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, check.Equals, nil)
- cache := newConfig(s.Config).Cache
+ cache := newConfig(ctxlog.TestLogger(c), s.Config).Cache
cache.registry = prometheus.NewRegistry()
// Hit the same collection 5 times using the same token. Only
s.checkCacheMetrics(c, cache.registry,
"requests 5",
"hits 4",
- "permission_hits 4",
"pdh_hits 4",
"api_calls 1")
s.checkCacheMetrics(c, cache.registry,
"requests 6",
"hits 4",
- "permission_hits 4",
"pdh_hits 4",
"api_calls 2")
s.checkCacheMetrics(c, cache.registry,
"requests 7",
"hits 5",
- "permission_hits 5",
"pdh_hits 4",
"api_calls 2")
s.checkCacheMetrics(c, cache.registry,
"requests 27",
"hits 23",
- "permission_hits 23",
"pdh_hits 22",
"api_calls 4")
}
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, check.Equals, nil)
- cache := newConfig(s.Config).Cache
+ cache := newConfig(ctxlog.TestLogger(c), s.Config).Cache
cache.registry = prometheus.NewRegistry()
for _, forceReload := range []bool{false, true, false, true} {
s.checkCacheMetrics(c, cache.registry,
"requests 4",
"hits 3",
- "permission_hits 1",
"pdh_hits 0",
- "api_calls 3")
+ "api_calls 1")
}
func (s *UnitSuite) TestCacheForceReloadByUUID(c *check.C) {
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, check.Equals, nil)
- cache := newConfig(s.Config).Cache
+ cache := newConfig(ctxlog.TestLogger(c), s.Config).Cache
cache.registry = prometheus.NewRegistry()
for _, forceReload := range []bool{false, true, false, true} {
s.checkCacheMetrics(c, cache.registry,
"requests 4",
"hits 3",
- "permission_hits 1",
"pdh_hits 3",
"api_calls 3")
}
}
func (s *UnitSuite) TestCORSPreflight(c *check.C) {
- h := handler{Config: newConfig(s.Config)}
+ h := handler{Config: newConfig(ctxlog.TestLogger(c), s.Config)}
u := mustParseURL("http://keep-web.example/c=" + arvadostest.FooCollection + "/foo")
req := &http.Request{
Method: "OPTIONS",
c.Assert(err, check.IsNil)
}
- h := handler{Config: newConfig(s.Config)}
+ h := handler{Config: newConfig(ctxlog.TestLogger(c), s.Config)}
u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo")
req := &http.Request{
Method: "GET",
RequestURI: u.RequestURI(),
}
resp := httptest.NewRecorder()
- cfg := newConfig(s.Config)
+ cfg := newConfig(ctxlog.TestLogger(c), s.Config)
cfg.cluster.Users.AnonymousUserToken = arvadostest.AnonymousToken
h := handler{Config: cfg}
h.ServeHTTP(resp, req)
contentType string
}{
{"picture.txt", "BMX bikes are small this year\n", "text/plain; charset=utf-8"},
- {"picture.bmp", "BMX bikes are small this year\n", "image/x-ms-bmp"},
+ {"picture.bmp", "BMX bikes are small this year\n", "image/(x-ms-)?bmp"},
{"picture.jpg", "BMX bikes are small this year\n", "image/jpeg"},
{"picture1", "BMX bikes are small this year\n", "image/bmp"}, // content sniff; "BM" is the magic signature for .bmp
{"picture2", "Cars are small this year\n", "text/plain; charset=utf-8"}, // content sniff
resp := httptest.NewRecorder()
s.testServer.Handler.ServeHTTP(resp, req)
c.Check(resp.Code, check.Equals, http.StatusOK)
- c.Check(resp.Header().Get("Content-Type"), check.Equals, trial.contentType)
+ c.Check(resp.Header().Get("Content-Type"), check.Matches, trial.contentType)
c.Check(resp.Body.String(), check.Equals, trial.content)
}
}
}
func (s *IntegrationSuite) TestDownloadLoggingPermission(c *check.C) {
- config := newConfig(s.ArvConfig)
+ config := newConfig(ctxlog.TestLogger(c), s.ArvConfig)
h := handler{Config: config}
u := mustParseURL("http://" + arvadostest.FooCollection + ".keep-web.example/foo")
}
func (s *IntegrationSuite) TestUploadLoggingPermission(c *check.C) {
- config := newConfig(s.ArvConfig)
+ config := newConfig(ctxlog.TestLogger(c), s.ArvConfig)
h := handler{Config: config}
for _, adminperm := range []bool{true, false} {
cluster *arvados.Cluster
}
-func newConfig(arvCfg *arvados.Config) *Config {
+func newConfig(logger logrus.FieldLogger, arvCfg *arvados.Config) *Config {
cfg := Config{}
var cls *arvados.Cluster
var err error
cfg.cluster = cls
cfg.Cache.config = &cfg.cluster.Collections.WebDAVCache
cfg.Cache.cluster = cls
+ cfg.Cache.logger = logger
return &cfg
}
if err != nil {
log.Fatal(err)
}
- cfg := newConfig(arvCfg)
+ cfg := newConfig(logger, arvCfg)
if *dumpConfig {
out, err := yaml.Marshal(cfg)
c.Check(counters["arvados_keepweb_collectioncache_api_calls//"].Value, check.Equals, int64(2))
c.Check(counters["arvados_keepweb_collectioncache_hits//"].Value, check.Equals, int64(1))
c.Check(counters["arvados_keepweb_collectioncache_pdh_hits//"].Value, check.Equals, int64(1))
- c.Check(counters["arvados_keepweb_collectioncache_permission_hits//"].Value, check.Equals, int64(1))
c.Check(gauges["arvados_keepweb_collectioncache_cached_manifests//"].Value, check.Equals, float64(1))
// FooCollection's cached manifest size is 45 ("1f4b0....+45") plus one 51-byte blob signature
c.Check(gauges["arvados_keepweb_sessions_cached_collection_bytes//"].Value, check.Equals, float64(45+51))
ldr.Path = "-"
arvCfg, err := ldr.Load()
c.Check(err, check.IsNil)
- cfg := newConfig(arvCfg)
+ cfg := newConfig(ctxlog.TestLogger(c), arvCfg)
c.Assert(err, check.IsNil)
cfg.Client = arvados.Client{
APIHost: testAPIHost,
"net/url"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"gopkg.in/check.v1"
)
func (s *UnitSuite) TestStatus(c *check.C) {
- h := handler{Config: newConfig(s.Config)}
+ h := handler{Config: newConfig(ctxlog.TestLogger(c), s.Config)}
u, _ := url.Parse("http://keep-web.example/status.json")
req := &http.Request{
Method: "GET",
Path string
UserID string
Verbose bool
+ CaseInsensitive bool
ParentGroupUUID string
ParentGroupName string
SysUserUUID string
"user-id",
"email",
"Attribute by which every user is identified. Valid values are: email and username.")
+ caseInsensitive := flags.Bool(
+ "case-insensitive",
+ false,
+ "Performs case insensitive matching on user IDs. Off by default.")
verbose := flags.Bool(
"verbose",
false,
config.ParentGroupUUID = *parentGroupUUID
config.UserID = *userID
config.Verbose = *verbose
+ config.CaseInsensitive = *caseInsensitive
return nil
}
}
defer f.Close()
- log.Printf("%s %s started. Using %q as users id and parent group UUID %q", os.Args[0], version, cfg.UserID, cfg.ParentGroupUUID)
+ iCaseLog := ""
+ if cfg.UserID == "username" && cfg.CaseInsensitive {
+ iCaseLog = " - username matching requested to be case-insensitive"
+ }
+ log.Printf("%s %s started. Using %q as users id and parent group UUID %q%s", os.Args[0], version, cfg.UserID, cfg.ParentGroupUUID, iCaseLog)
// Get the complete user list to minimize API Server requests
allUsers := make(map[string]arvados.User)
if err != nil {
return err
}
+ if cfg.UserID == "username" && uID != "" && cfg.CaseInsensitive {
+ uID = strings.ToLower(uID)
+ if uuid, found := userIDToUUID[uID]; found {
+ return fmt.Errorf("case insensitive collision for username %q between %q and %q", uID, u.UUID, uuid)
+ }
+ }
userIDToUUID[uID] = u.UUID
if cfg.Verbose {
log.Printf("Seen user %q (%s)", u.Username, u.UUID)
membersSkipped++
continue
}
+ if cfg.UserID == "username" && cfg.CaseInsensitive {
+ groupMember = strings.ToLower(groupMember)
+ }
if !(groupPermission == "can_read" || groupPermission == "can_write" || groupPermission == "can_manage") {
log.Printf("Warning: 3rd field should be 'can_read', 'can_write' or 'can_manage'. Found: %q at line %d, skipping.", groupPermission, lineNo)
membersSkipped++
if page.Len() == 0 {
break
}
- for _, i := range page.GetItems() {
- allItems = append(allItems, i)
- }
+ allItems = append(allItems, page.GetItems()...)
params.Offset += page.Len()
}
return allItems, nil
if err != nil {
return remoteGroups, groupNameToUUID, err
}
+ if cfg.UserID == "username" && cfg.CaseInsensitive {
+ memberID = strings.ToLower(memberID)
+ }
membersSet[memberID] = u2gLinkSet[link.HeadUUID]
}
remoteGroups[group.UUID] = &GroupInfo{
userID, _ := GetUserID(user, cfg.UserID)
return fmt.Errorf("error getting links needed to remove user %q from group %q: %s", userID, group.Name, err)
}
- for _, link := range l {
- links = append(links, link)
- }
+ links = append(links, l...)
}
for _, item := range links {
link := item.(arvados.Link)
os.Args = []string{"cmd", "somefile.csv"}
config, err := GetConfig()
c.Assert(err, IsNil)
+ config.UserID = "email"
// Confirm that the parent group was created
gl = arvados.GroupList{}
ac.RequestAndDecode(&gl, "GET", "/arvados/v1/groups", nil, params)
}},
}
ac.RequestAndDecode(&ll, "GET", "/arvados/v1/links", nil, params)
- if ll.Len() != 1 {
- return false
- }
- return true
+ return ll.Len() == 1
}
// If named group exists, return its UUID
func (s *TestSuite) TestParseFlagsWithPositionalArgument(c *C) {
cfg := ConfigParams{}
- os.Args = []string{"cmd", "-verbose", "/tmp/somefile.csv"}
+ os.Args = []string{"cmd", "-verbose", "-case-insensitive", "/tmp/somefile.csv"}
err := ParseFlags(&cfg)
c.Assert(err, IsNil)
c.Check(cfg.Path, Equals, "/tmp/somefile.csv")
c.Check(cfg.Verbose, Equals, true)
+ c.Check(cfg.CaseInsensitive, Equals, true)
}
func (s *TestSuite) TestParseFlagsWithoutPositionalArgument(c *C) {
c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID, "can_write"), Equals, true)
}
-// Users listed on the file that don't exist on the system are ignored
+// Entries with missing data are ignored.
func (s *TestSuite) TestIgnoreEmptyFields(c *C) {
activeUserEmail := s.users[arvadostest.ActiveUserUUID].Email
activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
s.cfg.Path = tmpfile.Name()
s.cfg.UserID = "username"
err = doMain(s.cfg)
- s.cfg.UserID = "email"
c.Assert(err, IsNil)
// Confirm that memberships exist
groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup1")
c.Assert(groupUUID, Not(Equals), "")
c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID, "can_write"), Equals, true)
}
+
+func (s *TestSuite) TestUseUsernamesWithCaseInsensitiveMatching(c *C) {
+ activeUserName := strings.ToUpper(s.users[arvadostest.ActiveUserUUID].Username)
+ activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+ // Confirm that group doesn't exist
+ groupUUID, err := RemoteGroupExists(s.cfg, "TestGroup1")
+ c.Assert(err, IsNil)
+ c.Assert(groupUUID, Equals, "")
+ // Create file & run command
+ data := [][]string{
+ {"TestGroup1", activeUserName},
+ }
+ tmpfile, err := MakeTempCSVFile(data)
+ c.Assert(err, IsNil)
+ defer os.Remove(tmpfile.Name()) // clean up
+ s.cfg.Path = tmpfile.Name()
+ s.cfg.UserID = "username"
+ s.cfg.CaseInsensitive = true
+ err = doMain(s.cfg)
+ c.Assert(err, IsNil)
+ // Confirm that memberships exist
+ groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup1")
+ c.Assert(err, IsNil)
+ c.Assert(groupUUID, Not(Equals), "")
+ c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID, "can_write"), Equals, true)
+}
+
+func (s *TestSuite) TestUsernamesCaseInsensitiveCollision(c *C) {
+ activeUserName := s.users[arvadostest.ActiveUserUUID].Username
+ activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+
+ nu := arvados.User{}
+ nuUsername := strings.ToUpper(activeUserName)
+ err := s.cfg.Client.RequestAndDecode(&nu, "POST", "/arvados/v1/users", nil, map[string]interface{}{
+ "user": map[string]string{
+ "username": nuUsername,
+ },
+ })
+ c.Assert(err, IsNil)
+
+ // Manually remove non-fixture user because /database/reset fails otherwise
+ defer s.cfg.Client.RequestAndDecode(nil, "DELETE", "/arvados/v1/users/"+nu.UUID, nil, nil)
+
+ c.Assert(nu.Username, Equals, nuUsername)
+ c.Assert(nu.UUID, Not(Equals), activeUserUUID)
+ c.Assert(nu.Username, Not(Equals), activeUserName)
+
+ data := [][]string{
+ {"SomeGroup", activeUserName},
+ }
+ tmpfile, err := MakeTempCSVFile(data)
+ c.Assert(err, IsNil)
+ defer os.Remove(tmpfile.Name()) // clean up
+
+ s.cfg.Path = tmpfile.Name()
+ s.cfg.UserID = "username"
+ s.cfg.CaseInsensitive = true
+ err = doMain(s.cfg)
+ // Should get an error because of "ACTIVE" and "Active" usernames
+ c.Assert(err, NotNil)
+ c.Assert(err, ErrorMatches, ".*case insensitive collision.*")
+}