Dante Tsang <dante@dantetsang.com>
Codex Genetics Ltd <info@codexgenetics.com>
Bruno P. Kinoshita <brunodepaulak@yahoo.com.br>
+George Chlipala <gchlip2@uic.edu>
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- rpcconn := rpcFromEnv()
- err := lc.checkAPISupport(ctx, crUUID)
+ rpcconn, err := rpcFromEnv()
+ if err != nil {
+ return err
+ }
+ err = lc.checkAPISupport(ctx, crUUID)
if err != nil {
return err
}
loginUsername = targetUUID[:i]
targetUUID = targetUUID[i+1:]
}
- if os.Getenv("ARVADOS_API_HOST") == "" || os.Getenv("ARVADOS_API_TOKEN") == "" {
- fmt.Fprintln(stderr, "fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set")
+ rpcconn, err := rpcFromEnv()
+ if err != nil {
+ fmt.Fprintln(stderr, err)
return 1
}
- rpcconn := rpcFromEnv()
- targetUUID, err := resolveToContainerUUID(rpcconn, targetUUID)
+ targetUUID, err = resolveToContainerUUID(rpcconn, targetUUID)
if err != nil {
fmt.Fprintln(stderr, err)
return 1
return "'" + strings.Replace(s, "'", "'\\''", -1) + "'"
}
-func rpcFromEnv() *rpc.Conn {
- insecure := os.Getenv("ARVADOS_API_HOST_INSECURE")
+func rpcFromEnv() (*rpc.Conn, error) {
+ ac := arvados.NewClientFromEnv()
+ if ac.APIHost == "" || ac.AuthToken == "" {
+ return nil, fmt.Errorf("fatal: ARVADOS_API_HOST and ARVADOS_API_TOKEN environment variables are not set, and ~/.config/arvados/settings.conf is not readable")
+ }
return rpc.NewConn("",
&url.URL{
Scheme: "https",
- Host: os.Getenv("ARVADOS_API_HOST"),
+ Host: ac.APIHost,
},
- insecure == "1" || insecure == "yes" || insecure == "true",
+ ac.Insecure,
func(context.Context) ([]string, error) {
- return []string{os.Getenv("ARVADOS_API_TOKEN")}, nil
- })
+ return []string{ac.AuthToken}, nil
+ }), nil
}
func resolveToContainerUUID(rpcconn *rpc.Conn, targetUUID string) (string, error) {
check "gopkg.in/check.v1"
)
-func (s *ClientSuite) TestShellGatewayNotAvailable(c *check.C) {
- var stdout, stderr bytes.Buffer
- cmd := exec.Command("go", "run", ".", "shell", arvadostest.QueuedContainerUUID, "-o", "controlpath=none", "echo", "ok")
- cmd.Env = append(cmd.Env, os.Environ()...)
- cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
- c.Check(cmd.Run(), check.NotNil)
- c.Log(stderr.String())
- c.Check(stderr.String(), check.Matches, `(?ms).*container is not running yet \(state is "Queued"\).*`)
+var _ = check.Suite(&shellSuite{})
+
+type shellSuite struct {
+ gobindir string
+ homedir string
+ runningUUID string
}
-func (s *ClientSuite) TestShellGateway(c *check.C) {
- defer func() {
- c.Check(arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil), check.IsNil)
- }()
- uuid := arvadostest.QueuedContainerUUID
+func (s *shellSuite) SetUpSuite(c *check.C) {
+ tmpdir := c.MkDir()
+ s.gobindir = tmpdir + "/bin"
+ c.Check(os.Mkdir(s.gobindir, 0777), check.IsNil)
+ s.homedir = tmpdir + "/home"
+ c.Check(os.Mkdir(s.homedir, 0777), check.IsNil)
+
+ // We explicitly build a client binary in our tempdir here,
+ // instead of using "go run .", because (a) we're going to
+ // invoke the same binary several times, and (b) we're going
+ // to change $HOME to a temp dir in some of the tests, which
+ // would force "go run ." to recompile the world instead of
+ // using the cached object files in the real $HOME.
+ c.Logf("building arvados-client binary in %s", s.gobindir)
+ cmd := exec.Command("go", "install", ".")
+ cmd.Env = append(os.Environ(), "GOBIN="+s.gobindir)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ c.Assert(cmd.Run(), check.IsNil)
+
+ s.runningUUID = arvadostest.RunningContainerUUID
h := hmac.New(sha256.New, []byte(arvadostest.SystemRootToken))
- fmt.Fprint(h, uuid)
+ fmt.Fprint(h, s.runningUUID)
authSecret := fmt.Sprintf("%x", h.Sum(nil))
gw := crunchrun.Gateway{
- ContainerUUID: uuid,
+ ContainerUUID: s.runningUUID,
Address: "0.0.0.0:0",
AuthSecret: authSecret,
Log: ctxlog.TestLogger(c),
func(context.Context) ([]string, error) {
return []string{arvadostest.SystemRootToken}, nil
})
- _, err = rpcconn.ContainerUpdate(context.TODO(), arvados.UpdateOptions{UUID: uuid, Attrs: map[string]interface{}{
- "state": arvados.ContainerStateLocked,
- }})
- c.Assert(err, check.IsNil)
- _, err = rpcconn.ContainerUpdate(context.TODO(), arvados.UpdateOptions{UUID: uuid, Attrs: map[string]interface{}{
- "state": arvados.ContainerStateRunning,
+ _, err = rpcconn.ContainerUpdate(context.TODO(), arvados.UpdateOptions{UUID: s.runningUUID, Attrs: map[string]interface{}{
"gateway_address": gw.Address,
}})
c.Assert(err, check.IsNil)
+}
+func (s *shellSuite) TearDownSuite(c *check.C) {
+ c.Check(arvados.NewClientFromEnv().RequestAndDecode(nil, "POST", "database/reset", nil, nil), check.IsNil)
+}
+
+func (s *shellSuite) TestShellGatewayNotAvailable(c *check.C) {
var stdout, stderr bytes.Buffer
- cmd := exec.Command("go", "run", ".", "shell", uuid, "-o", "controlpath=none", "-o", "userknownhostsfile="+c.MkDir()+"/known_hosts", "echo", "ok")
+ cmd := exec.Command(s.gobindir+"/arvados-client", "shell", arvadostest.QueuedContainerUUID, "-o", "controlpath=none", "echo", "ok")
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
+ c.Check(cmd.Run(), check.NotNil)
+ c.Log(stderr.String())
+ c.Check(stderr.String(), check.Matches, `(?ms).*container is not running yet \(state is "Queued"\).*`)
+}
+
+func (s *shellSuite) TestShellGatewayUsingEnvVars(c *check.C) {
+ s.testShellGateway(c, false)
+}
+func (s *shellSuite) TestShellGatewayUsingSettingsConf(c *check.C) {
+ s.testShellGateway(c, true)
+}
+func (s *shellSuite) testShellGateway(c *check.C, useSettingsConf bool) {
+ var stdout, stderr bytes.Buffer
+ cmd := exec.Command(
+ s.gobindir+"/arvados-client", "shell", s.runningUUID,
+ "-o", "controlpath=none",
+ "-o", "userknownhostsfile="+s.homedir+"/known_hosts",
+ "echo", "ok")
+ if useSettingsConf {
+ settings := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\nARVADOS_API_TOKEN=" + arvadostest.ActiveTokenV2 + "\nARVADOS_API_HOST_INSECURE=true\n"
+ err := os.MkdirAll(s.homedir+"/.config/arvados", 0777)
+ c.Assert(err, check.IsNil)
+ err = os.WriteFile(s.homedir+"/.config/arvados/settings.conf", []byte(settings), 0777)
+ c.Assert(err, check.IsNil)
+ for _, kv := range os.Environ() {
+ if !strings.HasPrefix(kv, "ARVADOS_") && !strings.HasPrefix(kv, "HOME=") {
+ cmd.Env = append(cmd.Env, kv)
+ }
+ }
+ cmd.Env = append(cmd.Env, "HOME="+s.homedir)
+ } else {
+ err := os.Remove(s.homedir + "/.config/arvados/settings.conf")
+ if !os.IsNotExist(err) {
+ c.Assert(err, check.IsNil)
+ }
+ cmd.Env = append(cmd.Env, os.Environ()...)
+ cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
+ }
+ cmd.Stdout = &stdout
+ cmd.Stderr = &stderr
stdin, err := cmd.StdinPipe()
c.Assert(err, check.IsNil)
go fmt.Fprintln(stdin, "data appears on stdin, but stdin does not close; cmd should exit anyway, not hang")
- time.AfterFunc(5*time.Second, func() {
+ timeout := time.AfterFunc(5*time.Second, func() {
c.Errorf("timed out -- remote end is probably hung waiting for us to close stdin")
stdin.Close()
})
+ c.Logf("cmd.Args: %s", cmd.Args)
c.Check(cmd.Run(), check.IsNil)
+ timeout.Stop()
c.Check(stdout.String(), check.Equals, "ok\n")
+}
+func (s *shellSuite) TestShellGatewayPortForwarding(c *check.C) {
+ c.Log("setting up an http server")
// Set up an http server, and try using "arvados-client shell"
// to forward traffic to it.
httpTarget := &httpserver.Server{}
w.WriteHeader(http.StatusNotFound)
}
})
- err = httpTarget.Start()
+ err := httpTarget.Start()
c.Assert(err, check.IsNil)
ln, err := net.Listen("tcp", ":0")
_, forwardedPort, _ := net.SplitHostPort(ln.Addr().String())
ln.Close()
- stdout.Reset()
- stderr.Reset()
+ c.Log("connecting")
+ var stdout, stderr bytes.Buffer
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(10*time.Second))
defer cancel()
- cmd = exec.CommandContext(ctx,
- "go", "run", ".", "shell", uuid,
+ cmd := exec.CommandContext(ctx,
+ s.gobindir+"/arvados-client", "shell", s.runningUUID,
"-L", forwardedPort+":"+httpTarget.Addr,
"-o", "controlpath=none",
- "-o", "userknownhostsfile="+c.MkDir()+"/known_hosts",
+ "-o", "userknownhostsfile="+s.homedir+"/known_hosts",
"-N",
)
- c.Logf("cmd.Args: %s", cmd.Args)
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, "ARVADOS_API_TOKEN="+arvadostest.ActiveTokenV2)
cmd.Stdout = &stdout
cmd.Stderr = &stderr
+ c.Logf("cmd.Args: %s", cmd.Args)
cmd.Start()
forwardedURL := fmt.Sprintf("http://localhost:%s/foo", forwardedPort)
wg.Wait()
}
-func (s *ClientSuite) TestContainerRequestLog(c *check.C) {
+var _ = check.Suite(&logsSuite{})
+
+type logsSuite struct{}
+
+func (s *logsSuite) TestContainerRequestLog(c *check.C) {
arvadostest.StartKeep(2, true)
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second))
defer cancel()
if insecure {
client = h.insecureClient
}
+ // Clearing the Host field here causes the Go http client to
+ // use the host part of urlOut as the Host header in the
+ // outgoing request, instead of the Host value from the
+ // original request we received.
+ req.Host = ""
return h.proxy.Do(req, urlOut, client)
}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
defer cancel()
- // 0.0.0.0:0 is just a placeholder here -- do(), which is
+ // "http://localhost" is just a placeholder here -- we'll fill
+ // in req.URL.Path below, and then do(), which is
// localClusterRequest(), will replace the scheme and host
// parts with the real proxy destination.
- req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://0.0.0.0:0/"+path, nil)
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost", nil)
if err != nil {
return nil, nil, err
}
+ req.URL.Path = path
resp, err := do(req)
if err != nil {
return nil, nil, err
import copy
import io
+import itertools
import functools
import hashlib
import json
api.return_value = mock.MagicMock()
arvrunner.api = api.return_value
arvrunner.runtimeContext.match_local_docker = False
- arvrunner.api.links().list().execute.side_effect = ({"items": [{"created_at": "",
- "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
- "link_class": "docker_image_repo+tag",
- "name": "arvados/jobs:"+arvados_cwl.__version__,
- "owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
- {"items": [{"created_at": "",
- "head_uuid": "",
- "link_class": "docker_image_hash",
- "name": "123456",
- "owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
- {"items": [{"created_at": "",
- "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
- "link_class": "docker_image_repo+tag",
- "name": "arvados/jobs:"+arvados_cwl.__version__,
- "owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
- {"items": [{"created_at": "",
- "head_uuid": "",
- "link_class": "docker_image_hash",
- "name": "123456",
- "owner_uuid": "",
- "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0}
- )
+ arvrunner.api.links().list().execute.side_effect = itertools.cycle([
+ {"items": [{"created_at": "2023-08-25T12:34:56.123456Z",
+ "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "link_class": "docker_image_repo+tag",
+ "name": "arvados/jobs:"+arvados_cwl.__version__,
+ "owner_uuid": "",
+ "uuid": "zzzzz-o0j2j-arvadosjobsrepo",
+ "properties": {"image_timestamp": ""}}]},
+ {"items": []},
+ {"items": []},
+ {"items": [{"created_at": "2023-08-25T12:34:57.234567Z",
+ "head_uuid": "",
+ "link_class": "docker_image_hash",
+ "name": "123456",
+ "owner_uuid": "",
+ "uuid": "zzzzz-o0j2j-arvadosjobshash",
+ "properties": {"image_timestamp": ""}}]},
+ {"items": []},
+ {"items": []},
+ ])
find_one_image_hash.return_value = "123456"
- arvrunner.api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
- "owner_uuid": "",
- "manifest_text": "",
- "properties": ""
- }], "items_available": 1, "offset": 0},
- {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
- "owner_uuid": "",
- "manifest_text": "",
- "properties": ""
- }], "items_available": 1, "offset": 0})
+ arvrunner.api.collections().list().execute.side_effect = itertools.cycle([
+ {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "owner_uuid": "",
+ "manifest_text": "",
+ "created_at": "2023-08-25T12:34:55.012345Z",
+ "properties": {}}]},
+ {"items": []},
+ {"items": []},
+ ])
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"portable_data_hash": "9999999999999999999999999999999b+99"}
logger.setLevel(stdliblog.DEBUG if config.get('ARVADOS_DEBUG')
else stdliblog.WARNING)
+@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def task_set_output(self, s, num_retries=5):
for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
try:
raise
_current_task = None
+@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def current_task(num_retries=5):
global _current_task
if _current_task:
raise
_current_job = None
+@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def current_job(num_retries=5):
global _current_job
if _current_job:
else:
raise
+@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def getjobparam(*args):
return current_job()['script_parameters'].get(*args)
+@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def get_job_param_mount(*args):
return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
+@util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def get_task_param_mount(*args):
return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
class JobTask(object):
+ @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def __init__(self, parameters=dict(), runtime_constraints=dict()):
print("init jobtask %s %s" % (parameters, runtime_constraints))
class job_setup(object):
@staticmethod
+ @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
if if_sequence != current_task()['sequence']:
return
exit(0)
@staticmethod
+ @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def one_task_per_input_stream(if_sequence=0, and_end_task=True):
if if_sequence != current_task()['sequence']:
return
#
# SPDX-License-Identifier: Apache-2.0
+import collections
import socket
import pycurl
import math
import json
import logging
import os
+import pathlib
import re
import socket
import ssl
apiclient_errors.HttpError.__new__ = staticmethod(_new_http_error)
def http_cache(data_type):
- homedir = os.environ.get('HOME')
- if not homedir or len(homedir) == 0:
+ try:
+ homedir = pathlib.Path.home()
+ except RuntimeError:
return None
- path = homedir + '/.cache/arvados/' + data_type
+ path = pathlib.Path(homedir, '.cache', 'arvados', data_type)
try:
- util.mkdir_dash_p(path)
+ path.mkdir(parents=True, exist_ok=True)
except OSError:
return None
- return cache.SafeHTTPCache(path, max_age=60*60*24*2)
+ return cache.SafeHTTPCache(str(path), max_age=60*60*24*2)
def api_client(
version,
_logger = logging.getLogger('arvados.collection')
-
-if sys.version_info >= (3, 0):
- TextIOWrapper = io.TextIOWrapper
-else:
- class TextIOWrapper(io.TextIOWrapper):
- """To maintain backward compatibility, cast str to unicode in
- write('foo').
-
- """
- def write(self, data):
- if isinstance(data, basestring):
- data = unicode(data)
- return super(TextIOWrapper, self).write(data)
-
-
class CollectionBase(object):
"""Abstract base class for Collection classes."""
class CollectionWriter(CollectionBase):
"""Deprecated, use Collection instead."""
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
def __init__(self, api_client=None, **kwargs):
self._dependencies = {}
super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
if 'b' not in mode:
bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
- f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+ f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
return f
def modified(self):
self._streams = [normalize_stream(s, streams[s])
for s in sorted(streams)]
+
+ @arvados.util._deprecated('3.0', 'Collection iteration')
@_populate_streams
def all_streams(self):
return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
for s in self._streams]
+ @arvados.util._deprecated('3.0', 'Collection iteration')
@_populate_streams
def all_files(self):
for s in self.all_streams():
import os
import re
import shutil
+import subprocess
import sys
import logging
import tempfile
# Check if git is available
def check_git_availability():
try:
- arvados.util.run_command(['git', '--help'])
- except Exception:
+ subprocess.run(
+ ['git', '--version'],
+ check=True,
+ stdout=subprocess.DEVNULL,
+ )
+ except FileNotFoundError:
abort('git command is not available. Please ensure git is installed.')
priority = https_url + other_url + http_url
- git_config = []
- git_url = None
for url in priority:
if url.startswith("http"):
u = urllib.parse.urlsplit(url)
try:
logger.debug("trying %s", url)
- arvados.util.run_command(["git"] + git_config + ["ls-remote", url],
- env={"HOME": os.environ["HOME"],
- "ARVADOS_API_TOKEN": api.api_token,
- "GIT_ASKPASS": "/bin/false"})
- except arvados.errors.CommandFailedError:
+ subprocess.run(
+ ['git', *git_config, 'ls-remote', url],
+ check=True,
+ env={
+ 'ARVADOS_API_TOKEN': api.api_token,
+ 'GIT_ASKPASS': '/bin/false',
+ 'HOME': os.environ['HOME'],
+ },
+ stdout=subprocess.DEVNULL,
+ )
+ except subprocess.CalledProcessError:
pass
else:
git_url = url
break
-
- if not git_url:
+ else:
raise Exception('Cannot access git repository, tried {}'
.format(priority))
# Copy collections
try:
- copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
+ copy_collections([col["uuid"] for col in arvados.util.keyset_list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
src, dst, args)
except Exception as e:
partial_error += "\n" + str(e)
# Copy workflows
- for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
+ for w in arvados.util.keyset_list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
try:
copy_workflow(w["uuid"], src, dst, args)
except Exception as e:
partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
if args.recursive:
- for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
+ for g in arvados.util.keyset_list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
try:
copy_project(g["uuid"], src, dst, project_record["uuid"], args)
except Exception as e:
# repository)
#
def git_rev_parse(rev, repo):
- gitout, giterr = arvados.util.run_command(
- ['git', 'rev-parse', rev], cwd=repo)
- return gitout.strip()
+ proc = subprocess.run(
+ ['git', 'rev-parse', rev],
+ check=True,
+ cwd=repo,
+ stdout=subprocess.PIPE,
+ text=True,
+ )
+ return proc.stdout.read().strip()
# uuid_type(api, object_uuid)
#
by_email = {}
by_username = {}
- users = []
- for c, arv in clusters.items():
- print("Getting user list from %s" % c)
- ul = arvados.util.list_all(arv.users().list, bypass_federation=True)
- for l in ul:
- if l["uuid"].startswith(c):
- users.append(l)
+ users = [
+ user
+ for prefix, arv in clusters.items()
+ for user in arvados.util.keyset_list_all(arv.users().list, bypass_federation=True)
+ if user['uuid'].startswith(prefix)
+ ]
# Users list is sorted by email
# Go through users and collect users with same email
# call add_accum_rows() to generate the report rows with
# the "home cluster" set, and also fill in the by_email table.
- users = sorted(users, key=lambda u: u["email"]+"::"+(u["username"] or "")+"::"+u["uuid"])
+ users.sort(key=lambda u: (u["email"], u["username"] or "", u["uuid"]))
accum = []
lastemail = None
import argparse
import hashlib
import os
+import pathlib
import re
import string
import sys
logger.error('Local file %s already exists.' % (outfilename,))
return 1
if args.r:
- arvados.util.mkdir_dash_p(os.path.dirname(outfilename))
+ pathlib.Path(outfilename).parent.mkdir(parents=True, exist_ok=True)
try:
outfile = open(outfilename, 'wb')
except Exception as error:
return (image_timestamp, created_timestamp)
def _get_docker_links(api_client, num_retries, **kwargs):
- links = arvados.util.list_all(api_client.links().list,
- num_retries, **kwargs)
+ links = list(arvados.util.keyset_list_all(
+ api_client.links().list, num_retries=num_retries, **kwargs,
+ ))
for link in links:
link['_sort_key'] = docker_link_sort_key(link)
links.sort(key=itemgetter('_sort_key'), reverse=True)
images.sort(key=itemgetter('_sort_key'), reverse=True)
# Remove any image listings that refer to unknown collections.
- existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
- api_client.collections().list, num_retries,
- filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
- select=['uuid'])}
+ existing_coll_uuids = {coll['uuid'] for coll in arvados.util.keyset_list_all(
+ api_client.collections().list,
+ num_retries=num_retries,
+ filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
+ select=['uuid'],
+ )}
return [(image['collection'], image) for image in images
if image['collection'] in existing_coll_uuids]
import arvados.commands.keepdocker
from arvados._version import __version__
from arvados.collection import CollectionReader
+from .. import util
logger = logging.getLogger('arvados.migrate-docker19')
logger.setLevel(logging.DEBUG if arvados.config.get('ARVADOS_DEBUG')
class MigrationFailed(Exception):
pass
+@util._deprecated('3.0')
def main(arguments=None):
"""Docker image format migration tool for Arvados.
from builtins import object
import json
import os
+from . import util
class TaskOutputDir(object):
"""Keep-backed directory for staging outputs of Crunch tasks.
f.write('42')
arvados.current_task().set_output(out.manifest_text())
"""
+ @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
def __init__(self):
self.path = os.environ['TASK_KEEPMOUNT_TMP']
from __future__ import absolute_import
from builtins import object
+import sys
import threading
-from . import api
from . import config
from . import keep
from . import util
+api = sys.modules['arvados.api']
+
class ThreadSafeApiCache(object):
"""Thread-safe wrapper for an Arvados API client
from arvados.keep import *
from . import config
from . import errors
+from . import util
from ._normalize_stream import normalize_stream
class StreamReader(object):
+ @util._deprecated('3.0', 'arvados.collection.Collecttion')
def __init__(self, tokens, keep=None, debug=False, _empty=False,
num_retries=10):
self._stream_name = None
from builtins import range
import fcntl
+import functools
import hashlib
import httplib2
import os
import subprocess
import errno
import sys
+import warnings
-import arvados
-from arvados.collection import CollectionReader
+import arvados.errors
HEX_RE = re.compile(r'^[0-9a-fA-F]+$')
CR_UNCOMMITTED = 'Uncommitted'
container_uuid_pattern = re.compile(r'[a-z0-9]{5}-dz642-[a-z0-9]{15}')
manifest_pattern = re.compile(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', flags=re.MULTILINE)
+def _deprecated(version=None, preferred=None):
+ """Mark a callable as deprecated in the SDK
+
+ This will wrap the callable to emit as a DeprecationWarning
+ and add a deprecation notice to its docstring.
+
+ If the following arguments are given, they'll be included in the
+ notices:
+
+ preferred: str | None
+ : The name of an alternative that users should use instead.
+
+ version: str | None
+ : The version of Arvados when the callable is scheduled to be
+ removed.
+ """
+ if version is None:
+ version = ''
+ else:
+ version = f' and scheduled to be removed in Arvados {version}'
+ if preferred is None:
+ preferred = ''
+ else:
+ preferred = f' Prefer {preferred} instead.'
+ def deprecated_decorator(func):
+ fullname = f'{func.__module__}.{func.__qualname__}'
+ parent, _, name = fullname.rpartition('.')
+ if name == '__init__':
+ fullname = parent
+ warning_msg = f'{fullname} is deprecated{version}.{preferred}'
+ @functools.wraps(func)
+ def deprecated_wrapper(*args, **kwargs):
+ warnings.warn(warning_msg, DeprecationWarning, 2)
+ return func(*args, **kwargs)
+ # Get func's docstring without any trailing newline or empty lines.
+ func_doc = re.sub(r'\n\s*$', '', func.__doc__ or '')
+ match = re.search(r'\n([ \t]+)\S', func_doc)
+ indent = '' if match is None else match.group(1)
+ # Make the deprecation notice the second "paragraph" of the
+ # docstring if possible. Otherwise append it.
+ docstring, count = re.subn(
+ rf'\n[ \t]*\n{indent}',
+ f'\n\n{indent}DEPRECATED: {warning_msg}\n\n{indent}',
+ func_doc,
+ count=1,
+ )
+ if not count:
+ docstring = f'{func_doc}\n\n{indent}DEPRECATED: {warning_msg}'.lstrip()
+ deprecated_wrapper.__doc__ = docstring
+ return deprecated_wrapper
+ return deprecated_decorator
+
+@_deprecated('3.0')
def clear_tmpdir(path=None):
"""
Ensure the given directory (or TASK_TMPDIR if none given)
exists and is empty.
"""
+ from arvados import current_task
if path is None:
- path = arvados.current_task().tmpdir
+ path = current_task().tmpdir
if os.path.exists(path):
p = subprocess.Popen(['rm', '-rf', path])
stdout, stderr = p.communicate(None)
raise Exception('rm -rf %s: %s' % (path, stderr))
os.mkdir(path)
+@_deprecated('3.0', 'subprocess.run')
def run_command(execargs, **kwargs):
kwargs.setdefault('stdin', subprocess.PIPE)
kwargs.setdefault('stdout', subprocess.PIPE)
(execargs, p.returncode, stderrdata))
return stdoutdata, stderrdata
+@_deprecated('3.0')
def git_checkout(url, version, path):
+ from arvados import current_job
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
if not os.path.exists(path):
run_command(["git", "clone", url, path],
cwd=os.path.dirname(path))
cwd=path)
return path
+@_deprecated('3.0')
def tar_extractor(path, decompress_flag):
return subprocess.Popen(["tar",
"-C", path,
stdin=subprocess.PIPE, stderr=sys.stderr,
shell=False, close_fds=True)
+@_deprecated('3.0', 'arvados.collection.Collection.open and the tarfile module')
def tarball_extract(tarball, path):
"""Retrieve a tarball from Keep and extract it to a local
directory. Return the absolute path where the tarball was
tarball -- collection locator
path -- where to extract the tarball: absolute, or relative to job tmp
"""
+ from arvados import current_job
+ from arvados.collection import CollectionReader
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
return os.path.join(path, tld_extracts[0])
return path
+@_deprecated('3.0', 'arvados.collection.Collection.open and the zipfile module')
def zipball_extract(zipball, path):
"""Retrieve a zip archive from Keep and extract it to a local
directory. Return the absolute path where the archive was
zipball -- collection locator
path -- where to extract the archive: absolute, or relative to job tmp
"""
+ from arvados import current_job
+ from arvados.collection import CollectionReader
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
return os.path.join(path, tld_extracts[0])
return path
+@_deprecated('3.0', 'arvados.collection.Collection')
def collection_extract(collection, path, files=[], decompress=True):
"""Retrieve a collection from Keep and extract it to a local
directory. Return the absolute path where the collection was
collection -- collection locator
path -- where to extract: absolute, or relative to job tmp
"""
+ from arvados import current_job
+ from arvados.collection import CollectionReader
matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
if matches:
collection_hash = matches.group(1)
else:
collection_hash = hashlib.md5(collection).hexdigest()
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
lockfile.close()
return path
+@_deprecated('3.0', 'pathlib.Path().mkdir(parents=True, exist_ok=True)')
def mkdir_dash_p(path):
if not os.path.isdir(path):
try:
else:
raise
+@_deprecated('3.0', 'arvados.collection.Collection')
def stream_extract(stream, path, files=[], decompress=True):
"""Retrieve a stream from Keep and extract it to a local
directory. Return the absolute path where the stream was
stream -- StreamReader object
path -- where to extract: absolute, or relative to job tmp
"""
+ from arvados import current_job
if not re.search('^/', path):
- path = os.path.join(arvados.current_job().tmpdir, path)
+ path = os.path.join(current_job().tmpdir, path)
lockfile = open(path + '.lock', 'w')
fcntl.flock(lockfile, fcntl.LOCK_EX)
try:
lockfile.close()
return path
+@_deprecated('3.0', 'os.walk')
def listdir_recursive(dirname, base=None, max_depth=None):
"""listdir_recursive(dirname, base, max_depth)
good_len = True
return bool(good_len and HEX_RE.match(s))
+@_deprecated('3.0', 'arvados.util.keyset_list_all')
def list_all(fn, num_retries=0, **kwargs):
# Default limit to (effectively) api server's MAX_LIMIT
kwargs.setdefault('limit', sys.maxsize)
kwargs["order"] = ["%s %s" % (order_key, asc), "uuid %s" % asc]
other_filters = kwargs.get("filters", [])
- if "select" in kwargs and "uuid" not in kwargs["select"]:
- kwargs["select"].append("uuid")
+ try:
+ select = set(kwargs['select'])
+ except KeyError:
+ pass
+ else:
+ select.add(order_key)
+ select.add('uuid')
+ kwargs['select'] = list(select)
nextpage = []
tot = 0
nextpage = [[order_key, ">=" if ascending else "<=", lastitem[order_key]], ["uuid", "!=", lastitem["uuid"]]]
prev_page_all_same_order_key = False
-
def ca_certs_path(fallback=httplib2.CA_CERTS):
"""Return the path of the best available CA certs source.
#
# SPDX-License-Identifier: Apache-2.0
+import itertools
import os
+import parameterized
import subprocess
import unittest
+from unittest import mock
+
import arvados
import arvados.util
self.n += 1
return self.expect[self.n-1][1]
+_SELECT_FAKE_ITEM = {
+ 'uuid': 'zzzzz-zyyyz-zzzzzyyyyywwwww',
+ 'name': 'KeysetListAllTestCase.test_select mock',
+ 'created_at': '2023-08-28T12:34:56.123456Z',
+}
+
class KeysetListAllTestCase(unittest.TestCase):
def test_empty(self):
ks = KeysetTestHelper([[
ls = list(arvados.util.keyset_list_all(ks.fn, filters=[["foo", ">", "bar"]]))
self.assertEqual(ls, [{"created_at": "1", "uuid": "1"}, {"created_at": "2", "uuid": "2"}])
-
def test_onepage_desc(self):
ks = KeysetTestHelper([[
{"limit": 1000, "count": "none", "order": ["created_at desc", "uuid desc"], "filters": []},
ls = list(arvados.util.keyset_list_all(ks.fn, ascending=False))
self.assertEqual(ls, [{"created_at": "2", "uuid": "2"}, {"created_at": "1", "uuid": "1"}])
+
+ @parameterized.parameterized.expand(zip(
+ itertools.cycle(_SELECT_FAKE_ITEM),
+ itertools.chain.from_iterable(
+ itertools.combinations(_SELECT_FAKE_ITEM, count)
+ for count in range(len(_SELECT_FAKE_ITEM) + 1)
+ ),
+ ))
+ def test_select(self, order_key, select):
+ # keyset_list_all must have both uuid and order_key to function.
+ # Test that it selects those fields along with user-specified ones.
+ expect_select = {'uuid', order_key, *select}
+ item = {
+ key: value
+ for key, value in _SELECT_FAKE_ITEM.items()
+ if key in expect_select
+ }
+ list_func = mock.Mock()
+ list_func().execute = mock.Mock(
+ side_effect=[
+ {'items': [item]},
+ {'items': []},
+ {'items': []},
+ ],
+ )
+ list_func.reset_mock()
+ actual = list(arvados.util.keyset_list_all(list_func, order_key, select=list(select)))
+ self.assertEqual(actual, [item])
+ calls = list_func.call_args_list
+ self.assertTrue(len(calls) >= 2, "list_func() not called enough to exhaust items")
+ for args, kwargs in calls:
+ self.assertEqual(set(kwargs.get('select', ())), expect_select)
joins('left outer join containers as requesting_container on container_requests.requesting_container_uuid = requesting_container.uuid').
where("container_requests.container_uuid = ? and "+
"container_requests.priority > 0 and "+
+ "container_requests.owner_uuid not in (select group_uuid from trashed_groups) and "+
"(requesting_container.priority is null or (requesting_container.state = 'Running' and requesting_container.priority > 0)) and "+
"container_requests.state = 'Committed' and "+
"container_requests.container_count < container_requests.container_count_max", uuid).
end
def update_collections(container:, collections: ['log', 'output'])
+
+ # Check if parent is frozen or trashed, in which case it isn't
+ # valid to create new collections in the project, so return
+ # without creating anything.
+ owner = Group.find_by_uuid(self.owner_uuid)
+ return if owner && !owner.admin_change_permitted
+
collections.each do |out_type|
pdh = container.send(out_type)
next if pdh.nil?
require 'can_be_an_owner'
require 'trashable'
+require 'update_priorities'
class Group < ArvadosModel
include HasUuid
t.add :can_manage
end
+ # check if admins are allowed to make changes to the project, e.g. it
+ # isn't trashed or frozen.
+ def admin_change_permitted
+ !(FrozenGroup.where(uuid: self.uuid).any? || TrashedGroup.where(group_uuid: self.uuid).any?)
+ end
+
protected
def self.attributes_required_columns
"select target_uuid as group_uuid, trash_at from #{temptable} where trash_at is not NULL " +
"on conflict (group_uuid) do update set trash_at=EXCLUDED.trash_at",
"Group.update_trash.insert")
+ ActiveRecord::Base.connection.exec_query(
+ "select container_uuid from container_requests where " +
+ "owner_uuid in (select target_uuid from #{temptable}) and " +
+ "requesting_container_uuid is NULL and state = 'Committed' and container_uuid is not NULL",
+ "Group.update_trash.update_priorities").each do |container_uuid|
+ update_priorities container_uuid["container_uuid"]
+ end
end
def update_frozen
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class PriorityUpdateFix < ActiveRecord::Migration[5.2]
+ def up
+ ActiveRecord::Base.connection.execute %{
+CREATE OR REPLACE FUNCTION container_priority(for_container_uuid character varying, inherited bigint, inherited_from character varying) returns bigint
+ LANGUAGE sql
+ AS $$
+/* Determine the priority of an individual container.
+ The "inherited" priority comes from the path we followed from the root, the parent container
+ priority hasn't been updated in the table yet but we need to behave it like it has been.
+*/
+select coalesce(max(case when containers.uuid = inherited_from then inherited
+ when containers.priority is not NULL then containers.priority
+ else container_requests.priority * 1125899906842624::bigint - (extract(epoch from container_requests.created_at)*1000)::bigint
+ end), 0) from
+ container_requests left outer join containers on container_requests.requesting_container_uuid = containers.uuid
+ where container_requests.container_uuid = for_container_uuid and
+ container_requests.state = 'Committed' and
+ container_requests.priority > 0 and
+ container_requests.owner_uuid not in (select group_uuid from trashed_groups);
+$$;
+}
+ end
+
+ def down
+ end
+end
else container_requests.priority * 1125899906842624::bigint - (extract(epoch from container_requests.created_at)*1000)::bigint
end), 0) from
container_requests left outer join containers on container_requests.requesting_container_uuid = containers.uuid
- where container_requests.container_uuid = for_container_uuid and container_requests.state = 'Committed' and container_requests.priority > 0;
+ where container_requests.container_uuid = for_container_uuid and
+ container_requests.state = 'Committed' and
+ container_requests.priority > 0 and
+ container_requests.owner_uuid not in (select group_uuid from trashed_groups);
$$;
SET default_with_oids = false;
+--
+-- Name: groups; Type: TABLE; Schema: public; Owner: -
+--
+
+CREATE TABLE public.groups (
+ id bigint NOT NULL,
+ uuid character varying(255),
+ owner_uuid character varying(255),
+ created_at timestamp without time zone NOT NULL,
+ modified_by_client_uuid character varying(255),
+ modified_by_user_uuid character varying(255),
+ modified_at timestamp without time zone,
+ name character varying(255) NOT NULL,
+ description character varying(524288),
+ updated_at timestamp without time zone NOT NULL,
+ group_class character varying(255),
+ trash_at timestamp without time zone,
+ is_trashed boolean DEFAULT false NOT NULL,
+ delete_at timestamp without time zone,
+ properties jsonb DEFAULT '{}'::jsonb,
+ frozen_by_uuid character varying
+);
+
+
--
-- Name: api_client_authorizations; Type: TABLE; Schema: public; Owner: -
--
);
---
--- Name: groups; Type: TABLE; Schema: public; Owner: -
---
-
-CREATE TABLE public.groups (
- id bigint NOT NULL,
- uuid character varying(255),
- owner_uuid character varying(255),
- created_at timestamp without time zone NOT NULL,
- modified_by_client_uuid character varying(255),
- modified_by_user_uuid character varying(255),
- modified_at timestamp without time zone,
- name character varying(255) NOT NULL,
- description character varying(524288),
- updated_at timestamp without time zone NOT NULL,
- group_class character varying(255),
- trash_at timestamp without time zone,
- is_trashed boolean DEFAULT false NOT NULL,
- delete_at timestamp without time zone,
- properties jsonb DEFAULT '{}'::jsonb,
- frozen_by_uuid character varying
-);
-
-
--
-- Name: groups_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
('20221230155924'),
('20230421142716'),
('20230503224107'),
-('20230815160000');
+('20230815160000'),
+('20230821000000');
running_to_be_deleted:
uuid: zzzzz-xvhdp-cr5runningcntnr
- owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ owner_uuid: zzzzz-j7d0g-rew6elm53kancon
name: running to be deleted
state: Committed
priority: 1
cr.destroy
# the cr's container now has priority of 0
+ c.reload
+ assert_equal 0, c.priority
+ end
+ end
+
+ test "trash the project containing a container_request and check its container's priority" do
+ act_as_user users(:active) do
+ cr = ContainerRequest.find_by_uuid container_requests(:running_to_be_deleted).uuid
+
+ # initially the cr's container has priority > 0
c = Container.find_by_uuid(cr.container_uuid)
+ assert_equal 1, c.priority
+
+ prj = Group.find_by_uuid cr.owner_uuid
+ prj.update_attributes!(trash_at: db_current_time)
+
+ # the cr's container now has priority of 0
+ c.reload
assert_equal 0, c.priority
+
+ assert_equal c.state, 'Running'
+ assert_equal cr.state, 'Committed'
+
+ # mark the container as cancelled, this should cause the
+ # container request to go to final state and run the finalize
+ # function
+ act_as_system_user do
+ c.update_attributes!(state: 'Cancelled', log: 'fa7aeb5140e2848d39b416daeef4ffc5+45')
+ end
+ c.reload
+ cr.reload
+
+ assert_equal c.state, 'Cancelled'
+ assert_equal cr.state, 'Final'
+ assert_equal nil, cr.log_uuid
end
end