logger.Trace("overquota")
overquota = sorted[i:]
break tryrun
- } else if logger.Info("creating new instance"); sch.pool.Create(it) {
+ } else if sch.pool.Create(it) {
// Success. (Note pool.Create works
// asynchronously and does its own
- // logging, so we don't need to.)
+ // logging about the eventual outcome,
+ // so we don't need to.)
+ logger.Info("creating new instance")
} else {
// Failed despite not being at quota,
// e.g., cloud ops throttled. TODO:
def update_pipeline_component(self, r):
pass
+ def _required_env(self):
+ env = {}
+ env["HOME"] = self.outdir
+ env["TMPDIR"] = self.tmpdir
+ return env
+
def run(self, runtimeContext):
# ArvadosCommandTool subclasses from cwltool.CommandLineTool,
# which calls makeJobRunner() to get a new ArvadosContainer
"path": "%s/%s" % (self.outdir, self.stdout)}
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
- if not docker_req:
- docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
from .runner import make_builder
+from ._version import __version__
from functools import partial
from schema_salad.sourceline import SourceLine
from cwltool.errors import WorkflowException
def __init__(self, arvrunner, toolpath_object, loadingContext):
super(ArvadosCommandTool, self).__init__(toolpath_object, loadingContext)
+
+ (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
+ if not docker_req:
+ self.hints.append({"class": "DockerRequirement",
+ "dockerImageId": "arvados/jobs:"+__version__})
+
self.arvrunner = arvrunner
def make_job_runner(self, runtimeContext):
if self.submit_request_uuid:
self.submit_runner_cluster = self.submit_request_uuid[0:5]
+
+ def get_outdir(self) -> str:
+ """Return self.outdir or create one with self.tmp_outdir_prefix."""
+ return self.outdir
+
+ def get_tmpdir(self) -> str:
+ """Return self.tmpdir or create one with self.tmpdir_prefix."""
+ return self.tmpdir
+
+ def create_tmpdir(self) -> str:
+ """Return self.tmpdir or create one with self.tmpdir_prefix."""
+ return self.tmpdir
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.0.20210319143721',
- 'schema-salad==7.1.20210611090601',
+ 'cwltool==3.1.20210816212154',
+ 'schema-salad==8.2.20210902094147',
'arvados-python-client{}'.format(pysdk_dep),
'setuptools',
'ciso8601 >= 2.0.0',
'networkx < 2.6'
],
- extras_require={
- ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
- ':python_version<"3"': ['pytz'],
- },
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
],
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner, enable_reuse)
}],
"baseCommand": "ls",
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
}],
"baseCommand": "ls",
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
"stdin": "/keep/99999999999999999999999999999996+99/file.txt",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool"
+ "class": "org.w3id.cwl.cwl.CommandLineTool"
})
loadingContext, runtimeContext = self.helper(runner)
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({"arguments": ["md5sum", "example.conf"],
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "http://commonwl.org/cwltool#Secrets",
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "ToolTimeLimit",
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "http://arvados.org/cwl#OutputStorageClass",
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
"id": "#",
- "class": "CommandLineTool",
+ "class": "org.w3id.cwl.cwl.CommandLineTool",
"hints": [
{
"class": "http://arvados.org/cwl#ProcessProperties",
package arvados
import (
+ "bytes"
"context"
"encoding/json"
"fmt"
}
func (dn *dirnode) loadManifest(txt string) error {
- var dirname string
- streams := strings.Split(txt, "\n")
- if streams[len(streams)-1] != "" {
+ streams := bytes.Split([]byte(txt), []byte{'\n'})
+ if len(streams[len(streams)-1]) != 0 {
return fmt.Errorf("line %d: no trailing newline", len(streams))
}
streams = streams[:len(streams)-1]
segments := []storedSegment{}
+ // To reduce allocs, we reuse a single "pathparts" slice
+ // (pre-split on "/" separators) for the duration of this
+ // func.
+ var pathparts []string
+ // To reduce allocs, we reuse a single "toks" slice of 3 byte
+ // slices.
+ var toks = make([][]byte, 3)
+ // Similar to bytes.SplitN(token, []byte{c}, 3), but splits
+ // into the toks slice rather than allocating a new one, and
+ // returns the number of toks (1, 2, or 3).
+ splitToToks := func(src []byte, c rune) int {
+ c1 := bytes.IndexRune(src, c)
+ if c1 < 0 {
+ toks[0] = src
+ return 1
+ }
+ toks[0], src = src[:c1], src[c1+1:]
+ c2 := bytes.IndexRune(src, c)
+ if c2 < 0 {
+ toks[1] = src
+ return 2
+ }
+ toks[1], toks[2] = src[:c2], src[c2+1:]
+ return 3
+ }
for i, stream := range streams {
lineno := i + 1
var anyFileTokens bool
var pos int64
var segIdx int
segments = segments[:0]
- for i, token := range strings.Split(stream, " ") {
+ pathparts = nil
+ streamparts := 0
+ for i, token := range bytes.Split(stream, []byte{' '}) {
if i == 0 {
- dirname = manifestUnescape(token)
+ pathparts = strings.Split(manifestUnescape(string(token)), "/")
+ streamparts = len(pathparts)
continue
}
- if !strings.Contains(token, ":") {
+ if !bytes.ContainsRune(token, ':') {
if anyFileTokens {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- toks := strings.SplitN(token, "+", 3)
- if len(toks) < 2 {
+ if splitToToks(token, '+') < 2 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 32)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 32)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
segments = append(segments, storedSegment{
- locator: token,
+ locator: string(token),
size: int(length),
offset: 0,
length: int(length),
} else if len(segments) == 0 {
return fmt.Errorf("line %d: bad locator %q", lineno, token)
}
-
- toks := strings.SplitN(token, ":", 3)
- if len(toks) != 3 {
+ if splitToToks(token, ':') != 3 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
anyFileTokens = true
- offset, err := strconv.ParseInt(toks[0], 10, 64)
+ offset, err := strconv.ParseInt(string(toks[0]), 10, 64)
if err != nil || offset < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- length, err := strconv.ParseInt(toks[1], 10, 64)
+ length, err := strconv.ParseInt(string(toks[1]), 10, 64)
if err != nil || length < 0 {
return fmt.Errorf("line %d: bad file segment %q", lineno, token)
}
- name := dirname + "/" + manifestUnescape(toks[2])
- fnode, err := dn.createFileAndParents(name)
+ if !bytes.ContainsAny(toks[2], `\/`) {
+ // optimization for a common case
+ pathparts = append(pathparts[:streamparts], string(toks[2]))
+ } else {
+ pathparts = append(pathparts[:streamparts], strings.Split(manifestUnescape(string(toks[2])), "/")...)
+ }
+ fnode, err := dn.createFileAndParents(pathparts)
if fnode == nil && err == nil && length == 0 {
// Special case: an empty file used as
// a marker to preserve an otherwise
continue
}
if err != nil || (fnode == nil && length != 0) {
- return fmt.Errorf("line %d: cannot use path %q with length %d: %s", lineno, name, length, err)
+ return fmt.Errorf("line %d: cannot use name %q with length %d: %s", lineno, toks[2], length, err)
}
// Map the stream offset/range coordinates to
// block/offset/range coordinates and add
return fmt.Errorf("line %d: no file segments", lineno)
} else if len(segments) == 0 {
return fmt.Errorf("line %d: no locators", lineno)
- } else if dirname == "" {
+ } else if streamparts == 0 {
return fmt.Errorf("line %d: no stream name", lineno)
}
}
//
// If path is a "parent directory exists" marker (the last path
// component is "."), the returned values are both nil.
-func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
+//
+// Newly added nodes have modtime==0. Caller is responsible for fixing
+// them with backdateTree.
+func (dn *dirnode) createFileAndParents(names []string) (fn *filenode, err error) {
var node inode = dn
- names := strings.Split(path, "/")
basename := names[len(names)-1]
for _, name := range names[:len(names)-1] {
switch name {
node = node.Parent()
continue
}
- modtime := node.Parent().FileInfo().ModTime()
node.Lock()
- locked := node
+ unlock := node.Unlock
node, err = node.Child(name, func(child inode) (inode, error) {
if child == nil {
- child, err := node.FS().newNode(name, 0755|os.ModeDir, modtime)
+ // note modtime will be fixed later in backdateTree()
+ child, err := node.FS().newNode(name, 0755|os.ModeDir, time.Time{})
if err != nil {
return nil, err
}
return child, nil
}
})
- locked.Unlock()
+ unlock()
if err != nil {
return
}
if basename == "." {
return
} else if !permittedName(basename) {
- err = fmt.Errorf("invalid file part %q in path %q", basename, path)
+ err = fmt.Errorf("invalid file part %q in path %q", basename, names)
return
}
- modtime := node.FileInfo().ModTime()
node.Lock()
defer node.Unlock()
_, err = node.Child(basename, func(child inode) (inode, error) {
switch child := child.(type) {
case nil:
- child, err = node.FS().newNode(basename, 0755, modtime)
+ child, err = node.FS().newNode(basename, 0755, time.Time{})
if err != nil {
return nil, err
}
}
}
+var bigmanifest = func() string {
+ var buf bytes.Buffer
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, "./dir%d", i)
+ for i := 0; i < 100; i++ {
+ fmt.Fprintf(&buf, " d41d8cd98f00b204e9800998ecf8427e+99999")
+ }
+ for i := 0; i < 2000; i++ {
+ fmt.Fprintf(&buf, " 1200000:300000:file%d", i)
+ }
+ fmt.Fprintf(&buf, "\n")
+ }
+ return buf.String()
+}()
+
+func (s *CollectionFSSuite) BenchmarkParseManifest(c *check.C) {
+ DebugLocksPanicMode = false
+ c.Logf("test manifest is %d bytes", len(bigmanifest))
+ for i := 0; i < c.N; i++ {
+ fs, err := (&Collection{ManifestText: bigmanifest}).FileSystem(s.client, s.kc)
+ c.Check(err, check.IsNil)
+ c.Check(fs, check.NotNil)
+ }
+}
+
func (s *CollectionFSSuite) checkMemSize(c *check.C, f File) {
fn := f.(*filehandle).inode.(*filenode)
var memsize int64
result = service.last_result()
if not success:
- if result.get('status_code', None):
+ if result.get('status_code'):
_logger.debug("Request fail: PUT %s => %s %s",
self.data_hash,
- result['status_code'],
- result['body'])
+ result.get('status_code'),
+ result.get('body'))
raise self.TaskFailed()
_logger.debug("KeepWriterThread %s succeeded %s+%i %s",
install_requires=[
'ciso8601 >=2.0.0',
'future',
- 'google-api-python-client >=1.6.2, <1.7',
+ 'google-api-python-client >=1.6.2, <2',
'httplib2 >=0.9.2',
'pycurl >=7.19.5.1',
- 'ruamel.yaml >=0.15.54, <=0.16.5',
+ 'ruamel.yaml >=0.15.54, <=0.17.11',
'setuptools',
'ws4py >=0.4.2',
],
- extras_require={
- ':os.name=="posix" and python_version<"3"': ['subprocess32 >= 3.5.1'],
- ':python_version<"3"': ['pytz'],
- },
classifiers=[
'Programming Language :: Python :: 3',
],
def last_result(self):
if self.will_succeed:
return self._result
+ else:
+ return {"status_code": 500, "body": "didn't succeed"}
def finished(self):
return False
Path string
UserID string
Verbose bool
+ CaseInsensitive bool
ParentGroupUUID string
ParentGroupName string
SysUserUUID string
"user-id",
"email",
"Attribute by which every user is identified. Valid values are: email and username.")
+ caseInsensitive := flags.Bool(
+ "case-insensitive",
+ false,
+ "Performs case insensitive matching on user IDs. Off by default.")
verbose := flags.Bool(
"verbose",
false,
config.ParentGroupUUID = *parentGroupUUID
config.UserID = *userID
config.Verbose = *verbose
+ config.CaseInsensitive = *caseInsensitive
return nil
}
}
defer f.Close()
- log.Printf("%s %s started. Using %q as users id and parent group UUID %q", os.Args[0], version, cfg.UserID, cfg.ParentGroupUUID)
+ iCaseLog := ""
+ if cfg.UserID == "username" && cfg.CaseInsensitive {
+ iCaseLog = " - username matching requested to be case-insensitive"
+ }
+ log.Printf("%s %s started. Using %q as users id and parent group UUID %q%s", os.Args[0], version, cfg.UserID, cfg.ParentGroupUUID, iCaseLog)
// Get the complete user list to minimize API Server requests
allUsers := make(map[string]arvados.User)
if err != nil {
return err
}
+ if cfg.UserID == "username" && uID != "" && cfg.CaseInsensitive {
+ uID = strings.ToLower(uID)
+ if uuid, found := userIDToUUID[uID]; found {
+ return fmt.Errorf("case insensitive collision for username %q between %q and %q", uID, u.UUID, uuid)
+ }
+ }
userIDToUUID[uID] = u.UUID
if cfg.Verbose {
log.Printf("Seen user %q (%s)", u.Username, u.UUID)
membersSkipped++
continue
}
+ if cfg.UserID == "username" && cfg.CaseInsensitive {
+ groupMember = strings.ToLower(groupMember)
+ }
if !(groupPermission == "can_read" || groupPermission == "can_write" || groupPermission == "can_manage") {
log.Printf("Warning: 3rd field should be 'can_read', 'can_write' or 'can_manage'. Found: %q at line %d, skipping.", groupPermission, lineNo)
membersSkipped++
if page.Len() == 0 {
break
}
- for _, i := range page.GetItems() {
- allItems = append(allItems, i)
- }
+ allItems = append(allItems, page.GetItems()...)
params.Offset += page.Len()
}
return allItems, nil
if err != nil {
return remoteGroups, groupNameToUUID, err
}
+ if cfg.UserID == "username" && cfg.CaseInsensitive {
+ memberID = strings.ToLower(memberID)
+ }
membersSet[memberID] = u2gLinkSet[link.HeadUUID]
}
remoteGroups[group.UUID] = &GroupInfo{
userID, _ := GetUserID(user, cfg.UserID)
return fmt.Errorf("error getting links needed to remove user %q from group %q: %s", userID, group.Name, err)
}
- for _, link := range l {
- links = append(links, link)
- }
+ links = append(links, l...)
}
for _, item := range links {
link := item.(arvados.Link)
os.Args = []string{"cmd", "somefile.csv"}
config, err := GetConfig()
c.Assert(err, IsNil)
+ config.UserID = "email"
// Confirm that the parent group was created
gl = arvados.GroupList{}
ac.RequestAndDecode(&gl, "GET", "/arvados/v1/groups", nil, params)
}},
}
ac.RequestAndDecode(&ll, "GET", "/arvados/v1/links", nil, params)
- if ll.Len() != 1 {
- return false
- }
- return true
+ return ll.Len() == 1
}
// If named group exists, return its UUID
func (s *TestSuite) TestParseFlagsWithPositionalArgument(c *C) {
cfg := ConfigParams{}
- os.Args = []string{"cmd", "-verbose", "/tmp/somefile.csv"}
+ os.Args = []string{"cmd", "-verbose", "-case-insensitive", "/tmp/somefile.csv"}
err := ParseFlags(&cfg)
c.Assert(err, IsNil)
c.Check(cfg.Path, Equals, "/tmp/somefile.csv")
c.Check(cfg.Verbose, Equals, true)
+ c.Check(cfg.CaseInsensitive, Equals, true)
}
func (s *TestSuite) TestParseFlagsWithoutPositionalArgument(c *C) {
c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID, "can_write"), Equals, true)
}
-// Users listed on the file that don't exist on the system are ignored
+// Entries with missing data are ignored.
func (s *TestSuite) TestIgnoreEmptyFields(c *C) {
activeUserEmail := s.users[arvadostest.ActiveUserUUID].Email
activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
s.cfg.Path = tmpfile.Name()
s.cfg.UserID = "username"
err = doMain(s.cfg)
- s.cfg.UserID = "email"
c.Assert(err, IsNil)
// Confirm that memberships exist
groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup1")
c.Assert(groupUUID, Not(Equals), "")
c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID, "can_write"), Equals, true)
}
+
+func (s *TestSuite) TestUseUsernamesWithCaseInsensitiveMatching(c *C) {
+ activeUserName := strings.ToUpper(s.users[arvadostest.ActiveUserUUID].Username)
+ activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+ // Confirm that group doesn't exist
+ groupUUID, err := RemoteGroupExists(s.cfg, "TestGroup1")
+ c.Assert(err, IsNil)
+ c.Assert(groupUUID, Equals, "")
+ // Create file & run command
+ data := [][]string{
+ {"TestGroup1", activeUserName},
+ }
+ tmpfile, err := MakeTempCSVFile(data)
+ c.Assert(err, IsNil)
+ defer os.Remove(tmpfile.Name()) // clean up
+ s.cfg.Path = tmpfile.Name()
+ s.cfg.UserID = "username"
+ s.cfg.CaseInsensitive = true
+ err = doMain(s.cfg)
+ c.Assert(err, IsNil)
+ // Confirm that memberships exist
+ groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup1")
+ c.Assert(err, IsNil)
+ c.Assert(groupUUID, Not(Equals), "")
+ c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID, "can_write"), Equals, true)
+}
+
+func (s *TestSuite) TestUsernamesCaseInsensitiveCollision(c *C) {
+ activeUserName := s.users[arvadostest.ActiveUserUUID].Username
+ activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+
+ nu := arvados.User{}
+ nuUsername := strings.ToUpper(activeUserName)
+ err := s.cfg.Client.RequestAndDecode(&nu, "POST", "/arvados/v1/users", nil, map[string]interface{}{
+ "user": map[string]string{
+ "username": nuUsername,
+ },
+ })
+ c.Assert(err, IsNil)
+
+ // Manually remove non-fixture user because /database/reset fails otherwise
+ defer s.cfg.Client.RequestAndDecode(nil, "DELETE", "/arvados/v1/users/"+nu.UUID, nil, nil)
+
+ c.Assert(nu.Username, Equals, nuUsername)
+ c.Assert(nu.UUID, Not(Equals), activeUserUUID)
+ c.Assert(nu.Username, Not(Equals), activeUserName)
+
+ data := [][]string{
+ {"SomeGroup", activeUserName},
+ }
+ tmpfile, err := MakeTempCSVFile(data)
+ c.Assert(err, IsNil)
+ defer os.Remove(tmpfile.Name()) // clean up
+
+ s.cfg.Path = tmpfile.Name()
+ s.cfg.UserID = "username"
+ s.cfg.CaseInsensitive = true
+ err = doMain(s.cfg)
+ // Should get an error because of "ACTIVE" and "Active" usernames
+ c.Assert(err, NotNil)
+ c.Assert(err, ErrorMatches, ".*case insensitive collision.*")
+}