if err != nil {
return err
}
+ cmd = exec.CommandContext(ctx, "ls", "-l", opts.PackageDir+"/"+packageFilename)
+ cmd.Stdout = stdout
+ cmd.Stderr = stderr
+ err = cmd.Run()
+ if err != nil {
+ return err
+ }
return nil
}
}
}
- cmd = exec.Command("ls", "-l", pkgfile)
- cmd.Stdout = stdout
- cmd.Stderr = stderr
- _ = cmd.Run()
-
return nil
}
|runtime_auth_scopes|array of string|The scopes associated with the auth token used to run this container.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container request|default is ["default"]|
|output_properties|hash|User metadata properties to set on the output collection. The output collection will also have default properties "type" ("intermediate" or "output") and "container_request" (the uuid of container request that produced the collection).|
+|cumulative_cost|number|Estimated cost of the cloud VMs used to satisfy the request, including retried attempts and completed subrequests, but not including reused containers.|0 if container was reused or VM price information was not available.|
h2(#priority). Priority
|interactive_session_started|boolean|Indicates whether @arvados-client shell@ has been used to run commands in the container, which may have altered the container's behavior and output.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container||
|output_properties|hash|User metadata properties to set on the output collection.|
+|cost|number|Estimated cost of the cloud VM used to run the container.|0 if not available.|
+|subrequests_cost|number|Total estimated cumulative cost of container requests submitted by this container.|0 if not available.|
h2(#container_states). Container states
*can_manage* access to a user grants can_manage access to the user, _and everything owned by that user_ .
If a user A *can_read* role R, and role R *can_manage* user B, then user A *can_read* user B _and everything owned by that user_ .
+Modifying a role group requires *can_manage* permission (by contrast, *can_write* is sufficient to modify project groups and other object types).
+
h2(#system). System user and group
A privileged user account exists for the use by internal Arvados components. This user manages system objects which should not be "owned" by any particular user. The system user uuid is @{siteprefix}-tpzed-000000000000000@.
package boot
import (
+ "bytes"
"context"
"fmt"
"io/ioutil"
"strings"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
// Run an Nginx process that proxies the supervisor's configured
vars := map[string]string{
"LISTENHOST": extListenHost,
"UPSTREAMHOST": super.ListenHost,
+ "INTERNALSUBNETS": internalSubnets(super.logger),
"SSLCERT": filepath.Join(super.tempdir, "server.crt"),
"SSLKEY": filepath.Join(super.tempdir, "server.key"),
"ACCESSLOG": filepath.Join(super.tempdir, "nginx_access.log"),
}
return waitForConnect(ctx, testurl.Host)
}
+
+// Return 0 or more local subnets as "geo" fragments for Nginx config,
+// e.g., "1.2.3.0/24 0; 10.1.0.0/16 0;".
+func internalSubnets(logger logrus.FieldLogger) string {
+ iproutes, err := exec.Command("ip", "route").CombinedOutput()
+ if err != nil {
+ logger.Warnf("treating all clients as external because `ip route` failed: %s (%q)", err, iproutes)
+ return ""
+ }
+ subnets := ""
+ for _, line := range bytes.Split(iproutes, []byte("\n")) {
+ fields := strings.Fields(string(line))
+ if len(fields) > 2 && fields[1] == "dev" {
+ // lan example:
+ // 192.168.86.0/24 dev ens3 proto kernel scope link src 192.168.86.196
+ // gcp example (private subnet):
+ // 10.47.0.0/24 dev eth0 proto kernel scope link src 10.47.0.5
+ // gcp example (no private subnet):
+ // 10.128.0.1 dev ens4 scope link
+ subnets += fields[0] + " 0; "
+ }
+ }
+ return subnets
+}
if err != nil {
return err
}
- conffile, err := os.OpenFile(filepath.Join(super.wwwtempdir, "config.yml"), os.O_CREATE|os.O_WRONLY, 0644)
+ conffile, err := os.OpenFile(filepath.Join(super.wwwtempdir, "config.yml"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return err
}
if super.ClusterType != "production" {
super.prependEnv("PATH", super.tempdir+"/bin:")
}
+ super.setEnv("ARVADOS_SERVER_ADDRESS", super.ListenHost)
// Now that we have the config, replace the bootstrap logger
// with a new one according to the logging config.
"encoding/json"
"errors"
"io"
+ "os"
"os/exec"
"os/user"
"strings"
if len(is.instances) > 0 {
return nil, errQuota
}
+ // A crunch-run process running in a previous instance may
+ // have marked the node as broken. In the loopback scenario a
+ // destroy+create cycle doesn't fix whatever was broken -- but
+ // nothing else will either, so the best we can do is remove
+ // the "broken" flag and try again.
+ if err := os.Remove("/var/lock/crunch-run-broken"); err == nil {
+ is.logger.Info("removed /var/lock/crunch-run-broken")
+ } else if !errors.Is(err, os.ErrNotExist) {
+ return nil, err
+ }
u, err := user.Current()
if err != nil {
return nil, err
RAM: hostram,
Scratch: scratch,
IncludedScratch: scratch,
+ Price: 1.0,
}}
cfg.Clusters[id] = cc
}
time.Sleep(time.Second / 2)
}
}
+ c.Logf("cr.CumulativeCost == %f", cr.CumulativeCost)
+ c.Check(cr.CumulativeCost, check.Not(check.Equals), 0.0)
if expectExitCode >= 0 {
c.Check(ctr.State, check.Equals, arvados.ContainerStateComplete)
c.Check(ctr.ExitCode, check.Equals, expectExitCode)
MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error)
finalState string
parentTemp string
+ costStartTime time.Time
keepstoreLogger io.WriteCloser
keepstoreLogbuf *bufThenWrite
if runner.finalState == "Complete" && runner.OutputPDH != nil {
update["output"] = *runner.OutputPDH
}
+ var it arvados.InstanceType
+ if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 {
+ update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds()
+ }
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
}
runner.CrunchLog.Printf("Using FUSE mount: %s", v)
runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
+ runner.costStartTime = time.Now()
hostname, hosterr := os.Hostname()
if hosterr != nil {
class RunnerContainer(Runner):
"""Submit and manage a container that runs arvados-cwl-runner."""
- def arvados_job_spec(self, runtimeContext):
+ def arvados_job_spec(self, runtimeContext, git_info):
"""Create an Arvados container request for this workflow.
The returned dict can be used to create a container passed as
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+ container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
+
properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
if properties_req:
builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
- job_spec = self.arvados_job_spec(runtimeContext)
+ job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
if runtimeContext.project_uuid:
job_spec["owner_uuid"] = runtimeContext.project_uuid
def upload_workflow(arvRunner, tool, job_order, project_uuid,
runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
- submit_runner_image=None):
+ submit_runner_image=None,
+ git_info=None):
- packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
+ packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext, git_info)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
import json
import re
from functools import partial
+import subprocess
import time
import urllib
import cwltool.workflow
from schema_salad.sourceline import SourceLine
import schema_salad.validate as validate
+from schema_salad.ref_resolver import file_uri, uri_file_path
import arvados
import arvados.config
Called when there's a need to report errors, warnings or just
activity statuses, for example in the RuntimeStatusLoggingHandler.
"""
+
+ if kind not in ('error', 'warning'):
+ # Ignore any other status kind
+ return
+
with self.workflow_eval_lock:
current = None
try:
if current is None:
return
runtime_status = current.get('runtime_status', {})
- if kind in ('error', 'warning'):
- updatemessage = runtime_status.get(kind, "")
- if not updatemessage:
- updatemessage = message
-
- # Subsequent messages tacked on in detail
- updatedetail = runtime_status.get(kind+'Detail', "")
- maxlines = 40
- if updatedetail.count("\n") < maxlines:
- if updatedetail:
- updatedetail += "\n"
- updatedetail += message + "\n"
-
- if detail:
- updatedetail += detail + "\n"
-
- if updatedetail.count("\n") >= maxlines:
- updatedetail += "\nSome messages may have been omitted. Check the full log."
-
- runtime_status.update({
- kind: updatemessage,
- kind+'Detail': updatedetail,
- })
- else:
- # Ignore any other status kind
+
+ original_updatemessage = updatemessage = runtime_status.get(kind, "")
+ if not updatemessage:
+ updatemessage = message
+
+ # Subsequent messages tacked on in detail
+ original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
+ maxlines = 40
+ if updatedetail.count("\n") < maxlines:
+ if updatedetail:
+ updatedetail += "\n"
+ updatedetail += message + "\n"
+
+ if detail:
+ updatedetail += detail + "\n"
+
+ if updatedetail.count("\n") >= maxlines:
+ updatedetail += "\nSome messages may have been omitted. Check the full log."
+
+ if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
+ # don't waste time doing an update if nothing changed
+ # (usually because we exceeded the max lines)
return
+
+ runtime_status.update({
+ kind: updatemessage,
+ kind+'Detail': updatedetail,
+ })
+
try:
self.api.containers().update(uuid=current['uuid'],
body={
for req in job_reqs:
tool.requirements.append(req)
+ @staticmethod
+ def get_git_info(tool):
+ in_a_git_repo = False
+ cwd = None
+ filepath = None
+
+ if tool.tool["id"].startswith("file://"):
+ # check if git is installed
+ try:
+ filepath = uri_file_path(tool.tool["id"])
+ cwd = os.path.dirname(filepath)
+ subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, check=True, capture_output=True, text=True)
+ in_a_git_repo = True
+ except Exception as e:
+ pass
+
+ gitproperties = {}
+
+ if in_a_git_repo:
+ git_commit = subprocess.run(["git", "log", "--format=%H", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
+ git_date = subprocess.run(["git", "log", "--format=%cD", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
+ git_committer = subprocess.run(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"], cwd=cwd, capture_output=True, text=True).stdout
+ git_branch = subprocess.run(["git", "branch", "--show-current"], cwd=cwd, capture_output=True, text=True).stdout
+ git_origin = subprocess.run(["git", "remote", "get-url", "origin"], cwd=cwd, capture_output=True, text=True).stdout
+ git_status = subprocess.run(["git", "status", "--untracked-files=no", "--porcelain"], cwd=cwd, capture_output=True, text=True).stdout
+ git_describe = subprocess.run(["git", "describe", "--always"], cwd=cwd, capture_output=True, text=True).stdout
+ git_toplevel = subprocess.run(["git", "rev-parse", "--show-toplevel"], cwd=cwd, capture_output=True, text=True).stdout
+ git_path = filepath[len(git_toplevel):]
+
+ gitproperties = {
+ "http://arvados.org/cwl#gitCommit": git_commit.strip(),
+ "http://arvados.org/cwl#gitDate": git_date.strip(),
+ "http://arvados.org/cwl#gitCommitter": git_committer.strip(),
+ "http://arvados.org/cwl#gitBranch": git_branch.strip(),
+ "http://arvados.org/cwl#gitOrigin": git_origin.strip(),
+ "http://arvados.org/cwl#gitStatus": git_status.strip(),
+ "http://arvados.org/cwl#gitDescribe": git_describe.strip(),
+ "http://arvados.org/cwl#gitPath": git_path.strip(),
+ }
+ else:
+ for g in ("http://arvados.org/cwl#gitCommit",
+ "http://arvados.org/cwl#gitDate",
+ "http://arvados.org/cwl#gitCommitter",
+ "http://arvados.org/cwl#gitBranch",
+ "http://arvados.org/cwl#gitOrigin",
+ "http://arvados.org/cwl#gitStatus",
+ "http://arvados.org/cwl#gitDescribe",
+ "http://arvados.org/cwl#gitPath"):
+ if g in tool.metadata:
+ gitproperties[g] = tool.metadata[g]
+
+ return gitproperties
+
+ def set_container_request_properties(self, container, properties):
+ resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
+ for cr in resp["items"]:
+ cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
+ self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
+
def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
+ git_info = self.get_git_info(updated_tool)
+ if git_info:
+ logger.info("Git provenance")
+ for g in git_info:
+ if git_info[g]:
+ logger.info(" %s: %s", g.split("#", 1)[1], git_info[g])
+
workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
runtimeContext.intermediate_storage_classes = default_storage_classes
if not runtimeContext.name:
- runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ if git_info.get("http://arvados.org/cwl#gitDescribe"):
+ self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
+ runtimeContext.name = self.name
if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
# When creating or updating workflow record, by default
submit_runner_ram=runtimeContext.submit_runner_ram,
name=runtimeContext.name,
merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image)
+ submit_runner_image=runtimeContext.submit_runner_image,
+ git_info=git_info)
self.stdout.write(uuid + "\n")
return (None, "success")
priority=runtimeContext.priority,
secret_store=self.secret_store,
collection_cache_size=runtimeContext.collection_cache_size,
- collection_cache_is_default=self.should_estimate_cache_size)
+ collection_cache_is_default=self.should_estimate_cache_size,
+ git_info=git_info)
else:
runtimeContext.runnerjob = tool.tool["id"]
current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
if current_container:
logger.info("Running inside container %s", current_container.get("uuid"))
+ self.set_container_request_properties(current_container, git_info)
self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
self.polling_thread = threading.Thread(target=self.poll_states)
upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext, git_info):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
for l in v:
visit(l, cur_id)
visit(packed, None)
+
+ if git_info:
+ for g in git_info:
+ packed[g] = git_info[g]
+
return packed
intermediate_output_ttl=0, merged_map=None,
priority=None, secret_store=None,
collection_cache_size=256,
- collection_cache_is_default=True):
+ collection_cache_is_default=True,
+ git_info=None):
loadingContext = loadingContext.copy()
loadingContext.metadata = updated_tool.metadata.copy()
self.priority = priority
self.secret_store = secret_store
self.enable_dev = loadingContext.enable_dev
+ self.git_info = git_info
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20220623174452',
- 'schema-salad==8.3.20220801194920',
+ 'cwltool==3.1.20220907141119',
+ 'schema-salad==8.3.20220825114525',
'arvados-python-client{}'.format(pysdk_dep),
'setuptools',
'ciso8601 >= 2.0.0',
import unittest
import cwltool.process
import re
+import os
from io import BytesIO
_rootDesc = None
-def stubs(wfname='submit_wf.cwl'):
+def stubs(wfdetails=('submit_wf.cwl', None)):
def outer_wrapper(func, *rest):
@functools.wraps(func)
@mock.patch("arvados_cwl.arvdocker.determine_image_id")
uuid4, determine_image_id, *args, **kwargs):
class Stubs(object):
pass
+
+ wfname = wfdetails[0]
+ wfpath = wfdetails[1]
+
stubs = Stubs()
stubs.events = events
stubs.keepdocker = keepdocker
stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
- with open("tests/wf/submit_wf_packed.cwl") as f:
+ cwd = os.getcwd()
+ filepath = os.path.join(cwd, "tests/wf/submit_wf_packed.cwl")
+ with open(filepath) as f:
expect_packed_workflow = yaml.round_trip_load(f)
+ if wfpath is None:
+ wfpath = wfname
+
+ gitinfo_workflow = copy.deepcopy(expect_packed_workflow)
+ gitinfo_workflow["$graph"][0]["id"] = "file://%s/tests/wf/%s" % (cwd, wfpath)
+ mocktool = mock.NonCallableMock(tool=gitinfo_workflow["$graph"][0], metadata=gitinfo_workflow)
+
+ git_info = arvados_cwl.executor.ArvCwlExecutor.get_git_info(mocktool)
+ expect_packed_workflow.update(git_info)
+
+ git_props = {"arv:"+k.split("#", 1)[1]: v for k,v in git_info.items()}
+
+ if wfname == wfpath:
+ container_name = "%s (%s)" % (wfpath, git_props["arv:gitDescribe"])
+ else:
+ container_name = wfname
+
stubs.expect_container_spec = {
'priority': 500,
'mounts': {
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
- '--output-name=Output from workflow '+wfname,
+ '--output-name=Output from workflow '+container_name,
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
- 'name': wfname,
+ 'name': container_name,
'container_image': '999999999999999999999999999999d3+99',
- 'output_name': 'Output from workflow '+wfname,
+ 'output_name': 'Output from workflow %s' % (container_name),
'output_path': '/var/spool/cwl',
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'ram': (1024+256)*1024*1024
},
'use_existing': False,
- 'properties': {},
+ 'properties': git_props,
'secret_mounts': {}
}
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs('submit_wf_no_reuse.cwl')
+ @stubs(('submit_wf_no_reuse.cwl', None))
def test_submit_container_reuse_disabled_by_workflow(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
self.assertEqual(exited, 0)
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = [
- 'arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--disable-reuse', "--collection-cache-size=256",
- '--output-name=Output from workflow submit_wf_no_reuse.cwl', '--debug', '--on-error=continue',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = ["--disable-reuse" if v == "--enable-reuse" else v for v in expect_container["command"]]
expect_container["use_existing"] = False
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs('hello container 123')
+ @stubs(('hello container 123', 'submit_wf.cwl'))
def test_submit_container_name(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--name=hello container 123",
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs('submit_wf_runner_resources.cwl')
+ @stubs(('submit_wf_runner_resources.cwl', None))
def test_submit_wf_runner_resources(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$namespaces"] = {
"arv": "http://arvados.org/cwl#",
}
- expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=512",
- '--output-name=Output from workflow submit_wf_runner_resources.cwl',
- '--debug', '--on-error=continue',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = ["--collection-cache-size=512" if v == "--collection-cache-size=256" else v for v in expect_container["command"]]
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
finally:
cwltool_logger.removeHandler(stderr_logger)
- @stubs('submit_wf_process_properties.cwl')
+ @stubs(('submit_wf_process_properties.cwl', None))
def test_submit_set_process_properties(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"arv": "http://arvados.org/cwl#"
}
- expect_container["properties"] = {
+ expect_container["properties"].update({
"baz": "blorp.txt",
"foo": "bar",
"quux": {
"q1": 1,
"q2": 2
}
- }
+ })
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
"io/fs"
"io/ioutil"
"log"
+ "net"
"net/http"
"net/url"
"os"
if ctrlURL.Host == "" {
return nil, fmt.Errorf("no host in config Services.Controller.ExternalURL: %v", ctrlURL)
}
+ var hc *http.Client
+ if srvaddr := os.Getenv("ARVADOS_SERVER_ADDRESS"); srvaddr != "" {
+ // When this client is used to make a request to
+ // https://{ctrlhost}:port/ (any port), it dials the
+ // indicated port on ARVADOS_SERVER_ADDRESS instead.
+ //
+ // This is invoked by arvados-server boot to ensure
+ // that server->server traffic (e.g.,
+ // keepproxy->controller) only hits local interfaces,
+ // even if the Controller.ExternalURL host is a load
+ // balancer / gateway and not a local interface
+ // address (e.g., when running on a cloud VM).
+ //
+ // This avoids unnecessary delay/cost of routing
+ // external traffic, and also allows controller to
+ // recognize other services as internal clients based
+ // on the connection source address.
+ divertedHost := (*url.URL)(&cluster.Services.Controller.ExternalURL).Hostname()
+ var dialer net.Dialer
+ hc = &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: cluster.TLS.Insecure},
+ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
+ host, port, err := net.SplitHostPort(addr)
+ if err == nil && network == "tcp" && host == divertedHost {
+ addr = net.JoinHostPort(srvaddr, port)
+ }
+ return dialer.DialContext(ctx, network, addr)
+ },
+ },
+ }
+ }
return &Client{
+ Client: hc,
Scheme: ctrlURL.Scheme,
APIHost: ctrlURL.Host,
Insecure: cluster.TLS.Insecure,
RuntimeToken string `json:"runtime_token"`
AuthUUID string `json:"auth_uuid"`
Log string `json:"log"`
+ Cost float64 `json:"cost"`
+ SubrequestsCost float64 `json:"subrequests_cost"`
}
// ContainerRequest is an arvados#container_request resource.
ContainerCount int `json:"container_count"`
OutputStorageClasses []string `json:"output_storage_classes"`
OutputProperties map[string]interface{} `json:"output_properties"`
+ CumulativeCost float64 `json:"cumulative_cost"`
}
// Mount is special behavior to attach to a filesystem path or device.
// Client object shared by client requests. Supports HTTP KeepAlive.
Client *http.Client
- // If true, sets the X-External-Client header to indicate
- // the client is outside the cluster.
- External bool
-
// Base URIs of Keep services, e.g., {"https://host1:8443",
// "https://host2:8443"}. If this is nil, Keep clients will
// use the arvados.v1.keep_services.accessible API to discover
// fields from configuration files but still need to use the
// arvadosclient.ArvadosClient package.
func New(c *arvados.Client) (*ArvadosClient, error) {
- ac := &ArvadosClient{
- Scheme: "https",
- ApiServer: c.APIHost,
- ApiToken: c.AuthToken,
- ApiInsecure: c.Insecure,
- Client: &http.Client{
+ hc := c.Client
+ if hc == nil {
+ hc = &http.Client{
Timeout: 5 * time.Minute,
Transport: &http.Transport{
TLSClientConfig: MakeTLSConfig(c.Insecure)},
- },
- External: false,
+ }
+ }
+ ac := &ArvadosClient{
+ Scheme: "https",
+ ApiServer: c.APIHost,
+ ApiToken: c.AuthToken,
+ ApiInsecure: c.Insecure,
+ Client: hc,
Retries: 2,
KeepServiceURIs: c.KeepServiceURIs,
lastClosedIdlesAt: time.Now(),
// MakeArvadosClient creates a new ArvadosClient using the standard
// environment variables ARVADOS_API_HOST, ARVADOS_API_TOKEN,
-// ARVADOS_API_HOST_INSECURE, ARVADOS_EXTERNAL_CLIENT, and
-// ARVADOS_KEEP_SERVICES.
-func MakeArvadosClient() (ac *ArvadosClient, err error) {
- ac, err = New(arvados.NewClientFromEnv())
- if err != nil {
- return
- }
- ac.External = StringBool(os.Getenv("ARVADOS_EXTERNAL_CLIENT"))
- return
+// ARVADOS_API_HOST_INSECURE, and ARVADOS_KEEP_SERVICES.
+func MakeArvadosClient() (*ArvadosClient, error) {
+ return New(arvados.NewClientFromEnv())
}
// CallRaw is the same as Call() but returns a Reader that reads the
if c.RequestID != "" {
req.Header.Add("X-Request-Id", c.RequestID)
}
- if c.External {
- req.Header.Add("X-External-Client", "1")
- }
resp, err = c.Client.Do(req)
if err != nil {
self.max_request_size < len(kwargs['body'])):
raise apiclient_errors.MediaUploadSizeError("Request size %i bytes exceeds published limit of %i bytes" % (len(kwargs['body']), self.max_request_size))
- if config.get("ARVADOS_EXTERNAL_CLIENT", "") == "true":
- headers['X-External-Client'] = '1'
-
headers['Authorization'] = 'OAuth2 %s' % self.arvados_api_token
retryable = method in [
config.get('ARVADOS_API_TOKEN'),
config.flag_is_true('ARVADOS_API_HOST_INSECURE'),
config.get('ARVADOS_KEEP_PROXY'),
- config.get('ARVADOS_EXTERNAL_CLIENT') == 'true',
os.environ.get('KEEP_LOCAL_STORE'))
if (global_client_object is None) or (cls._last_key != key):
global_client_object = KeepClient()
fastcgi_temp_path "{{TMPDIR}}";
uwsgi_temp_path "{{TMPDIR}}";
scgi_temp_path "{{TMPDIR}}";
+ geo $external_client {
+ default 1;
+ 127.0.0.0/8 0;
+ ::1 0;
+ fd00::/8 0;
+ {{INTERNALSUBNETS}}
+ }
upstream controller {
server {{UPSTREAMHOST}}:{{CONTROLLERPORT}};
}
client_max_body_size 0;
location / {
proxy_pass http://controller;
+ proxy_set_header Upgrade $http_upgrade;
+ proxy_set_header Connection "upgrade";
proxy_set_header Host $http_host;
+ proxy_set_header X-External-Client $external_client;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
proxy_redirect off;
nginxconf['ACCESSLOG'] = _logfilename('nginx_access')
nginxconf['ERRORLOG'] = _logfilename('nginx_error')
nginxconf['TMPDIR'] = TEST_TMPDIR + '/nginx'
+ nginxconf['INTERNALSUBNETS'] = '169.254.0.0/16 0;'
conftemplatefile = os.path.join(MY_DIRNAME, 'nginx.conf')
conffile = os.path.join(TEST_TMPDIR, 'nginx.conf')
cls._orig_config = arvados.config.settings().copy()
cls._cleanup_funcs = []
os.environ.pop('ARVADOS_KEEP_SERVICES', None)
- os.environ.pop('ARVADOS_EXTERNAL_CLIENT', None)
for server_kwargs, start_func, stop_func in (
(cls.MAIN_SERVER, run, reset),
(cls.WS_SERVER, run_ws, stop_ws),
cls.api_client = arvados.api('v1')
def tearDown(self):
- arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
super(KeepProxyTestCase, self).tearDown()
def test_KeepProxyTest1(self):
'wrong content from Keep.get(md5("baz"))')
self.assertTrue(keep_client.using_proxy)
- def test_KeepProxyTest2(self):
- # Don't instantiate the proxy directly, but set the X-External-Client
- # header. The API server should direct us to the proxy.
- arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
- keep_client = arvados.KeepClient(api_client=self.api_client,
- proxy='', local_store='')
- baz_locator = keep_client.put('baz2')
- self.assertRegex(
- baz_locator,
- '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
- 'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
- self.assertEqual(keep_client.get(baz_locator),
- b'baz2',
- 'wrong content from Keep.get(md5("baz2"))')
- self.assertTrue(keep_client.using_proxy)
-
def test_KeepProxyTestMultipleURIs(self):
# Test using ARVADOS_KEEP_SERVICES env var overriding any
# existing proxy setting and setting multiple proxies
self.assertEqual('100::1', service.hostname)
self.assertEqual(10, service.port)
+ def test_recognize_proxy_services_in_controller_response(self):
+ keep_client = arvados.KeepClient(api_client=self.mock_keep_services(
+ service_type='proxy', service_host='localhost', service_port=9, count=1))
+ try:
+ # this will fail, but it ensures we get the service
+ # discovery response
+ keep_client.put('baz2')
+ except:
+ pass
+ self.assertTrue(keep_client.using_proxy)
+
def test_insecure_disables_tls_verify(self):
api_client = self.mock_keep_services(count=1)
force_timeout = socket.timeout("timed out")
t.add :interactive_session_started
t.add :output_storage_classes
t.add :output_properties
+ t.add :cost
+ t.add :subrequests_cost
end
# Supported states for a container
def validate_change
permitted = [:state]
- progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties, :exit_code]
final_attrs = [:finished_at]
+ progress_attrs = [:progress, :runtime_status, :subrequests_cost, :cost,
+ :log, :output, :output_properties, :exit_code]
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
when Running
permitted.push :finished_at, *progress_attrs
when Queued, Locked
- permitted.push :finished_at, :log, :runtime_status
+ permitted.push :finished_at, :log, :runtime_status, :cost
end
else
cr.with_lock do
leave_modified_by_user_alone do
# Use row locking because this increments container_count
+ cr.cumulative_cost += self.cost + self.subrequests_cost
cr.container_uuid = c.uuid
cr.save!
end
serialize :scheduling_parameters, Hash
after_find :fill_container_defaults_after_find
+ after_initialize { @state_was_when_initialized = self.state_was } # see finalize_if_needed
before_validation :fill_field_defaults, :if => :new_record?
before_validation :fill_container_defaults
validates :command, :container_image, :output_path, :cwd, :presence => true
t.add :use_existing
t.add :output_storage_classes
t.add :output_properties
+ t.add :cumulative_cost
end
# Supported states for a container request
def finalize!
container = Container.find_by_uuid(container_uuid)
if !container.nil?
+ # We don't want to add the container cost if the container was
+ # already finished when this CR was committed. But we are
+ # running in an after_save hook after a lock/reload, so
+ # state_was has already been updated to Committed regardless.
+ # Hence the need for @state_was_when_initialized.
+ if @state_was_when_initialized == Committed
+ # Add the final container cost to our cumulative cost (which
+ # may already be non-zero from previous attempts if
+ # container_count_max > 1).
+ self.cumulative_cost += container.cost + container.subrequests_cost
+ end
+
+ # Add our cumulative cost to the subrequests_cost of the
+ # requesting container, if any.
+ if self.requesting_container_uuid
+ Container.where(
+ uuid: self.requesting_container_uuid,
+ state: Container::Running,
+ ).each do |c|
+ c.subrequests_cost += self.cumulative_cost
+ c.save!
+ end
+ end
+
update_collections(container: container)
if container.state == Container::Complete
case self.state
when Committed
- permitted.push :priority, :container_count_max, :container_uuid
+ permitted.push :priority, :container_count_max, :container_uuid, :cumulative_cost
if self.priority.nil?
self.errors.add :priority, "cannot be nil"
when Final
if self.state_was == Committed
# "Cancel" means setting priority=0, state=Committed
- permitted.push :priority
+ permitted.push :priority, :cumulative_cost
if current_user.andand.is_admin
permitted.push :output_uuid, :log_uuid
if self.owner_uuid != system_user_uuid
raise "Owner uuid for role must be system user"
end
- raise PermissionDeniedError unless current_user.can?(manage: uuid)
+ raise PermissionDeniedError.new("role group cannot be modified without can_manage permission") unless current_user.can?(manage: uuid)
true
else
super
# note: these permission links are obsolete, they have no effect
# on anything and they are not created for new users.
Link.where(tail_uuid: self.email,
- link_class: 'permission',
- name: 'can_login').destroy_all
+ link_class: 'permission',
+ name: 'can_login').destroy_all
# delete repo_perms for this user
Link.where(tail_uuid: self.uuid,
- link_class: 'permission',
- name: 'can_manage').destroy_all
+ link_class: 'permission',
+ name: 'can_manage').destroy_all
# delete vm_login_perms for this user
Link.where(tail_uuid: self.uuid,
- link_class: 'permission',
- name: 'can_login').destroy_all
+ link_class: 'permission',
+ name: 'can_login').destroy_all
# delete "All users" group read permissions for this user
Link.where(tail_uuid: self.uuid,
- head_uuid: all_users_group_uuid,
- link_class: 'permission',
- name: 'can_read').destroy_all
+ head_uuid: all_users_group_uuid,
+ link_class: 'permission').destroy_all
# delete any signatures by this user
Link.where(link_class: 'signature',
- tail_uuid: self.uuid).destroy_all
+ tail_uuid: self.uuid).destroy_all
# delete tokens for this user
ApiClientAuthorization.where(user_id: self.id).destroy_all
#
if Link.where(tail_uuid: self.uuid,
head_uuid: all_users_group_uuid,
- link_class: 'permission',
- name: 'can_read').any?
+ link_class: 'permission').any?
errors.add :is_active, "cannot be set to false directly, use the 'Deactivate' button on Workbench, or the 'unsetup' API call"
end
end
resp = [Link.where(tail_uuid: self.uuid,
head_uuid: all_users_group_uuid,
link_class: 'permission',
- name: 'can_read').first ||
+ name: 'can_write').first ||
Link.create(tail_uuid: self.uuid,
head_uuid: all_users_group_uuid,
link_class: 'permission',
- name: 'can_read')]
+ name: 'can_write')]
if Rails.configuration.Users.ActivatedUsersAreVisibleToOthers
resp += [Link.where(tail_uuid: all_users_group_uuid,
head_uuid: self.uuid,
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class WriteViaAllUsers < ActiveRecord::Migration[5.2]
+ include CurrentApiClient
+ def up
+ changelinks(from: "can_read", to: "can_write")
+ end
+ def down
+ changelinks(from: "can_write", to: "can_read")
+ end
+ def changelinks(from:, to:)
+ ActiveRecord::Base.connection.exec_query(
+ "update links set name=$1 where link_class=$2 and name=$3 and tail_uuid like $4 and head_uuid = $5",
+ "migrate", [
+ [nil, to],
+ [nil, "permission"],
+ [nil, from],
+ [nil, "_____-tpzed-_______________"],
+ [nil, all_users_group_uuid],
+ ])
+ end
+end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddCostToContainers < ActiveRecord::Migration[5.2]
+ def change
+ add_column :containers, :cost, :float, null: false, default: 0
+ add_column :containers, :subrequests_cost, :float, null: false, default: 0
+ add_column :container_requests, :cumulative_cost, :float, null: false, default: 0
+ end
+end
SET default_tablespace = '';
-SET default_with_oids = false;
-
--
-- Name: api_client_authorizations; Type: TABLE; Schema: public; Owner: -
--
secret_mounts jsonb DEFAULT '{}'::jsonb,
runtime_token text,
output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
- output_properties jsonb DEFAULT '{}'::jsonb
+ output_properties jsonb DEFAULT '{}'::jsonb,
+ cumulative_cost double precision DEFAULT 0.0 NOT NULL
);
gateway_address character varying,
interactive_session_started boolean DEFAULT false NOT NULL,
output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
- output_properties jsonb DEFAULT '{}'::jsonb
+ output_properties jsonb DEFAULT '{}'::jsonb,
+ cost double precision DEFAULT 0.0 NOT NULL,
+ subrequests_cost double precision DEFAULT 0.0 NOT NULL
);
('20220301155729'),
('20220303204419'),
('20220401153101'),
-('20220505112900');
-
+('20220505112900'),
+('20220726034131'),
+('20220804133317');
updated_at: 2014-01-24 20:42:26 -0800
tail_uuid: zzzzz-tpzed-xurymjxw79nv3jz
link_class: permission
- name: can_read
+ name: can_write
head_uuid: zzzzz-j7d0g-fffffffffffffff
properties: {}
updated_at: 2014-01-24 20:42:26 -0800
tail_uuid: zzzzz-tpzed-l1s2piq4t4mps8r
link_class: permission
- name: can_read
+ name: can_write
head_uuid: zzzzz-j7d0g-fffffffffffffff
properties: {}
updated_at: 2013-12-26T20:52:21Z
tail_uuid: zzzzz-tpzed-x9kqpd79egh49c7
link_class: permission
- name: can_read
+ name: can_write
head_uuid: zzzzz-j7d0g-fffffffffffffff
properties: {}
updated_at: 2013-12-26T20:52:21Z
tail_uuid: zzzzz-tpzed-7sg468ezxwnodxs
link_class: permission
- name: can_read
+ name: can_write
head_uuid: zzzzz-j7d0g-fffffffffffffff
properties: {}
updated_at: 2015-07-28T21:34:41.361747000Z
tail_uuid: zzzzz-tpzed-projectviewer1a
link_class: permission
- name: can_read
+ name: can_write
head_uuid: zzzzz-j7d0g-fffffffffffffff
properties: {}
updated_at: 2014-01-24 20:42:26 -0800
tail_uuid: zzzzz-tpzed-user1withloadab
link_class: permission
- name: can_read
+ name: can_write
head_uuid: zzzzz-j7d0g-fffffffffffffff
properties: {}
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
"foo/#{repo_name}", created['uuid'], 'arvados#repository', true, 'Repository'
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#virtualMachine', false, 'permission', 'can_login',
# two extra links; system_group, and group
verify_links_added 2
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', response_object['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#repository', false, 'permission', 'can_manage',
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
'foo/usertestrepo', created['uuid'], 'arvados#repository', true, 'Repository'
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#virtualMachine', false, 'permission', 'can_login',
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
'foo/usertestrepo', created['uuid'], 'arvados#repository', true, 'Repository'
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#virtualMachine', true, 'permission', 'can_login',
assert_equal active_user[:email], created['email'], 'expected input email'
# verify links
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
assert_equal active_user['email'], created['email'], 'expected original email'
# verify links
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
assert_equal(repos_count, repos_query.count)
assert_equal active_user['uuid'], json_response['uuid']
updated = User.where(uuid: active_user['uuid']).first
assert_equal(true, updated.is_active)
- assert_equal({read: true}, updated.group_permissions[all_users_group_uuid])
+ assert_equal({read: true, write: true}, updated.group_permissions[all_users_group_uuid])
end
test "non-admin user can get basic information about readable users" do
# SPDX-License-Identifier: AGPL-3.0
module UsersTestHelper
+ include CurrentApiClient
+
def verify_link(response_items, link_object_name, expect_link, link_class,
link_name, head_uuid, tail_uuid, head_kind, fetch_object, class_name)
link = find_obj_in_resp response_items, 'arvados#link', link_object_name
assert !vm_login_perms.any?, "expected all vm_login_perms deleted"
end
- group = Group.where(name: 'All users').select do |g|
- g[:uuid].match(/-f+$/)
- end.first
- group_read_perms = Link.where(tail_uuid: uuid,
- head_uuid: group[:uuid],
+ group_write_perms = Link.where(tail_uuid: uuid,
+ head_uuid: all_users_group_uuid,
link_class: 'permission',
- name: 'can_read')
+ name: 'can_write')
if expect_group_perms
- assert group_read_perms.any?, "expected all users group read perms"
+ assert group_write_perms.any?, "expected all users group write perms"
else
- assert !group_read_perms.any?, "expected all users group perm deleted"
+ assert !group_write_perms.any?, "expected all users group write perms deleted"
end
signed_uuids = Link.where(link_class: 'signature',
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
'foo/usertestrepo', created['uuid'], 'arvados#repository', true, 'Repository'
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#virtualMachine', false, 'permission', 'can_login',
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
'foo/usertestrepo', created['uuid'], 'arvados#repository', true, 'Repository'
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#virtualMachine', true, 'permission', 'can_login',
# two new links: system_group, and 'All users' group.
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#virtualMachine', false, 'permission', 'can_login',
assert_equal 'foo@example.com', created['email'], 'expected input email'
# verify links
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
assert_equal created['email'], 'foo@example.com', 'expected original email'
# verify links
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#virtualMachine', true, 'permission', 'can_login',
# four extra links: system_group, login, group, repo and vm
- verify_link response_items, 'arvados#group', true, 'permission', 'can_read',
+ verify_link response_items, 'arvados#group', true, 'permission', 'can_write',
'All users', created['uuid'], 'arvados#group', true, 'Group'
verify_link response_items, 'arvados#repository', true, 'permission', 'can_manage',
act_as_system_user do
Container.find_by_uuid(cr.container_uuid).
- update_attributes!(state: Container::Cancelled)
+ update_attributes!(state: Container::Cancelled, cost: 1.25)
end
cr.reload
assert_equal "Final", cr.state
+ assert_equal 1.25, cr.cumulative_cost
assert_equal users(:active).uuid, cr.modified_by_user_uuid
end
log_pdh = 'fa7aeb5140e2848d39b416daeef4ffc5+45'
act_as_system_user do
c.update_attributes!(state: Container::Complete,
+ cost: 1.25,
output: output_pdh,
log: log_pdh)
end
cr.reload
assert_equal "Final", cr.state
+ assert_equal 1.25, cr.cumulative_cost
assert_equal users(:active).uuid, cr.modified_by_user_uuid
assert_not_nil cr.output_uuid
prev_container_uuid = cr.container_uuid
act_as_system_user do
+ c.update_attributes!(cost: 0.5, subrequests_cost: 1.25)
c.update_attributes!(state: Container::Cancelled)
end
c = act_as_system_user do
c = Container.find_by_uuid(cr.container_uuid)
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+ c.update_attributes!(cost: 0.125)
c.update_attributes!(state: Container::Cancelled)
c
end
assert_equal "Final", cr.state
assert_equal prev_container_uuid, cr.container_uuid
assert_not_equal cr2.container_uuid, cr.container_uuid
+ assert_equal 1.875, cr.cumulative_cost
end
test "Retry on container cancelled with runtime_token" do
end
end
+ test "Cumulative cost includes retried attempts but not reused containers" do
+ set_user_from_auth :active
+ cr = create_minimal_req!(priority: 5, state: "Committed", container_count_max: 3)
+ c = Container.find_by_uuid cr.container_uuid
+ act_as_system_user do
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+ c.update_attributes!(state: Container::Cancelled, cost: 3)
+ end
+ cr.reload
+ assert_equal 3, cr.cumulative_cost
+
+ c = Container.find_by_uuid cr.container_uuid
+ lock_and_run c
+ c.reload
+ assert_equal 0, c.subrequests_cost
+
+ # cr2 is a child/subrequest
+ cr2 = with_container_auth(c) do
+ create_minimal_req!(priority: 10, state: "Committed", container_count_max: 1, command: ["echo", "foo2"])
+ end
+ assert_equal c.uuid, cr2.requesting_container_uuid
+ c2 = Container.find_by_uuid cr2.container_uuid
+ act_as_system_user do
+ c2.update_attributes!(state: Container::Locked)
+ c2.update_attributes!(state: Container::Running)
+ logc = Collection.new(owner_uuid: system_user_uuid,
+ manifest_text: ". ef772b2f28e2c8ca84de45466ed19ee9+7815 0:0:arv-mount.txt\n")
+ logc.save!
+ c2.update_attributes!(state: Container::Complete,
+ exit_code: 0,
+ output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+ log: logc.portable_data_hash,
+ cost: 7)
+ end
+ c.reload
+ assert_equal 7, c.subrequests_cost
+
+ # cr3 is an identical child/subrequest, will reuse c2
+ cr3 = with_container_auth(c) do
+ create_minimal_req!(priority: 10, state: "Committed", container_count_max: 1, command: ["echo", "foo2"])
+ end
+ assert_equal c.uuid, cr3.requesting_container_uuid
+ c3 = Container.find_by_uuid cr3.container_uuid
+ assert_equal c2.uuid, c3.uuid
+ assert_equal Container::Complete, c3.state
+ c.reload
+ assert_equal 7, c.subrequests_cost
+
+ act_as_system_user do
+ c.update_attributes!(state: Container::Complete, exit_code: 0, cost: 9)
+ end
+
+ c.reload
+ assert_equal 7, c.subrequests_cost
+ cr.reload
+ assert_equal 3+7+9, cr.cumulative_cost
+ end
+
end
verify_user resp_user, email
group_perm = find_obj_in_resp response, 'Link', 'arvados#group'
- verify_link group_perm, 'permission', 'can_read', resp_user[:uuid], nil
+ verify_link group_perm, 'permission', 'can_write', resp_user[:uuid], groups(:all_users).uuid
group_perm2 = find_obj_in_resp response, 'Link', 'arvados#user'
if visible
verify_user resp_user, email
group_perm = find_obj_in_resp response, 'Link', 'arvados#group'
- verify_link group_perm, 'permission', 'can_read', resp_user[:uuid], nil
+ verify_link group_perm, 'permission', 'can_write', resp_user[:uuid], groups(:all_users).uuid
repo_perm = find_obj_in_resp response, 'Link', 'arvados#repository'
verify_link repo_perm, 'permission', 'can_manage', resp_user[:uuid], nil
verify_user resp_user, email
group_perm = find_obj_in_resp response, 'Link', 'arvados#group'
- verify_link group_perm, 'permission', 'can_read', resp_user[:uuid], nil
+ verify_link group_perm, 'permission', 'can_write', resp_user[:uuid], groups(:all_users).uuid
group_perm2 = find_obj_in_resp response, 'Link', 'arvados#user'
verify_link group_perm2, 'permission', 'can_read', groups(:all_users).uuid, nil
assert_equal user.uuid, resp_user[:uuid], 'expected uuid not found'
group_perm = find_obj_in_resp response, 'Link', 'arvados#group'
- verify_link group_perm, 'permission', 'can_read', resp_user[:uuid], nil
+ verify_link group_perm, 'permission', 'can_write', resp_user[:uuid], groups(:all_users).uuid
repo_perm = find_obj_in_resp response, 'Link', 'arvados#repository'
verify_link repo_perm, 'permission', 'can_manage', resp_user[:uuid], nil
assert_equal user.uuid, resp_user[:uuid], 'expected uuid not found'
group_perm = find_obj_in_resp response, 'Link', 'arvados#group'
- verify_link group_perm, 'permission', 'can_read', resp_user[:uuid], nil
+ verify_link group_perm, 'permission', 'can_write', resp_user[:uuid], groups(:all_users).uuid
repo_perm = find_obj_in_resp response, 'Link', 'arvados#repository'
verify_link repo_perm, 'permission', 'can_manage', resp_user[:uuid], nil
# check user setup
verify_link_exists(Rails.configuration.Users.AutoSetupNewUsers || active,
groups(:all_users).uuid, user.uuid,
- "permission", "can_read")
+ "permission", "can_write")
# Check for repository.
if named_repo = (prior_repo or
if client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
for k, v := range disp.cluster.Containers.SLURM.SbatchEnvironmentVariables {
os.Setenv(k, v)
}
for _, c := range classes {
perclass[c] = 0
}
- for _, r := range bsm.get(blkid).Replicas {
+ bs, ok := bsm.entries[blkid]
+ if !ok {
+ return 0
+ }
+ for _, r := range bs.Replicas {
total += r.KeepMount.Replication
mntclasses := r.KeepMount.StorageClasses
if len(mntclasses) == 0 {
package keepbalance
import (
+ "sync"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(40), knownBlkid(41)}, nil)
c.Check(n, check.Equals, 4)
}
+
+func (s *confirmedReplicationSuite) TestConcurrency(c *check.C) {
+ var wg sync.WaitGroup
+ for i := 1000; i < 1256; i++ {
+ i := i
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(i), knownBlkid(i)}, []string{"default"})
+ c.Check(n, check.Equals, 0)
+ }()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ n := s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(10)}, []string{"default"})
+ c.Check(n, check.Equals, 1)
+ n = s.blockStateMap.GetConfirmedReplication([]arvados.SizedDigest{knownBlkid(20)}, []string{"default"})
+ c.Check(n, check.Equals, 2)
+ }()
+ }
+ wg.Wait()
+}
// Use about 1 goroutine per 2 CPUs. Based on experiments with
// a 2-core host, using more concurrent database
// calls/transactions makes this process slower, not faster.
- for i := 0; i < runtime.NumCPU()+1/2; i++ {
+ for i := 0; i < (runtime.NumCPU()+1)/2; i++ {
wg.Add(1)
goSendErr(errs, func() error {
defer wg.Done()
collection, filepath = h.determineCollection(fs, filepath)
}
if collection != nil {
- log = log.WithField("collection_uuid", collection.UUID).
- WithField("collection_file_path", filepath)
- props["collection_uuid"] = collection.UUID
+ log = log.WithField("collection_file_path", filepath)
props["collection_file_path"] = filepath
- // h.determineCollection populates the collection_uuid prop with the PDH, if
- // this collection is being accessed via PDH. In that case, blank the
- // collection_uuid field so that consumers of the log entries can rely on it
- // being a UUID, or blank. The PDH remains available via the
- // portable_data_hash property.
- if props["collection_uuid"] == collection.PortableDataHash {
- props["collection_uuid"] = ""
+ // h.determineCollection populates the collection_uuid
+ // prop with the PDH, if this collection is being
+ // accessed via PDH. For logging, we use a different
+ // field depending on whether it's a UUID or PDH.
+ if len(collection.UUID) > 32 {
+ log = log.WithField("portable_data_hash", collection.UUID)
+ props["portable_data_hash"] = collection.UUID
+ } else {
+ log = log.WithField("collection_uuid", collection.UUID)
+ props["collection_uuid"] = collection.UUID
}
}
if r.Method == "PUT" || r.Method == "POST" {
}
func (h *handler) determineCollection(fs arvados.CustomFileSystem, path string) (*arvados.Collection, string) {
- segments := strings.Split(path, "/")
- var i int
- for i = 0; i < len(segments); i++ {
- dir := append([]string{}, segments[0:i]...)
- dir = append(dir, ".arvados#collection")
- f, err := fs.OpenFile(strings.Join(dir, "/"), os.O_RDONLY, 0)
- if f != nil {
- defer f.Close()
- }
+ target := strings.TrimSuffix(path, "/")
+ for {
+ fi, err := fs.Stat(target)
if err != nil {
- if !os.IsNotExist(err) {
+ return nil, ""
+ }
+ switch src := fi.Sys().(type) {
+ case *arvados.Collection:
+ return src, strings.TrimPrefix(path[len(target):], "/")
+ case *arvados.Group:
+ return nil, ""
+ default:
+ if _, ok := src.(error); ok {
return nil, ""
}
- continue
}
- // err is nil so we found it.
- decoder := json.NewDecoder(f)
- var collection arvados.Collection
- err = decoder.Decode(&collection)
- if err != nil {
+ // Try parent
+ cut := strings.LastIndexByte(target, '/')
+ if cut < 0 {
return nil, ""
}
- return &collection, strings.Join(segments[i:], "/")
+ target = target[:cut]
}
- return nil, ""
}
TestProxyUUID: "http://" + srv.Addr,
}
kc.SetServiceRoots(sr, sr, sr)
- kc.Arvados.External = true
return srv, kc, logbuf
}
"type": "file",
"source": "scripts/usr-local-bin-ensure-encrypted-partitions-aws-ebs-autoscale.sh",
"destination": "/tmp/usr-local-bin-ensure-encrypted-partitions-aws-ebs-autoscale.sh"
- },{
- "type": "file",
- "source": "scripts/create-ebs-volume-nvme.patch",
- "destination": "/tmp/create-ebs-volume-nvme.patch"
},{
"type": "file",
"source": "{{user `public_key_file`}}",
unzip -q /tmp/awscliv2.zip -d /tmp && $SUDO /tmp/aws/install
# Pinned to v2.4.5 because we apply a patch below
#export EBS_AUTOSCALE_VERSION=$(curl --silent "https://api.github.com/repos/awslabs/amazon-ebs-autoscale/releases/latest" | jq -r .tag_name)
- export EBS_AUTOSCALE_VERSION="v2.4.5"
- cd /opt && $SUDO git clone https://github.com/awslabs/amazon-ebs-autoscale.git
+ export EBS_AUTOSCALE_VERSION="5ca6e24e05787b8ae1184c2a10db80053ddd3038"
+ cd /opt && $SUDO git clone https://github.com/arvados/amazon-ebs-autoscale.git
cd /opt/amazon-ebs-autoscale && $SUDO git checkout $EBS_AUTOSCALE_VERSION
- $SUDO patch -p1 < /tmp/create-ebs-volume-nvme.patch
-
- # This script really requires bash and the shebang line is wrong
- $SUDO sed -i 's|^#!/bin/sh|#!/bin/bash|' /opt/amazon-ebs-autoscale/bin/ebs-autoscale
# Set up the cloud-init script that makes use of the AWS EBS autoscaler
$SUDO mv /tmp/usr-local-bin-ensure-encrypted-partitions-aws-ebs-autoscale.sh /usr/local/bin/ensure-encrypted-partitions.sh
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: BSD-3-Clause
-
-Make the create-ebs-volume script work with nvme devices.
-
-diff --git a/bin/create-ebs-volume b/bin/create-ebs-volume
-index 6857564..e3122fa 100755
---- a/bin/create-ebs-volume
-+++ b/bin/create-ebs-volume
-@@ -149,10 +149,11 @@ function get_next_logical_device() {
- for letter in ${alphabet[@]}; do
- # use /dev/xvdb* device names to avoid contention for /dev/sd* and /dev/xvda names
- # only supported by HVM instances
-- if [ ! -b "/dev/xvdb${letter}" ]; then
-+ if [[ $created_volumes =~ .*/dev/xvdb${letter}.* ]]; then
-+ continue
-+ fi
- echo "/dev/xvdb${letter}"
- break
-- fi
- done
- }
-
-@@ -323,8 +324,13 @@ function create_and_attach_volume() {
-
- logthis "waiting for volume $volume_id on filesystem"
- while true; do
-- if [ -e "$device" ]; then
-- logthis "volume $volume_id on filesystem as $device"
-+ # AWS returns e.g. vol-00338247831716a7b4, the kernel changes that to vol00338247831716a7b
-+ valid_volume_id=`echo $volume_id |sed -e 's/[^a-zA-Z0-9]//'`
-+ # example lsblk output:
-+ # nvme4n1 259:7 0 150G 0 disk vol00338247831716a7b
-+ if LSBLK=`lsblk -o NAME,SERIAL |grep $valid_volume_id`; then
-+ nvme_device=/dev/`echo $LSBLK|cut -f1 -d' '`
-+ logthis "volume $volume_id on filesystem as $nvme_device (aws device $device)"
- break
- fi
- sleep 1
-@@ -338,7 +344,7 @@ function create_and_attach_volume() {
- > /dev/null
- logthis "volume $volume_id DeleteOnTermination ENABLED"
-
-- echo $device
-+ echo "$nvme_device"
- }
-
- create_and_attach_volume
then
sv stop docker.io || service stop docker.io || true
else
- service docker stop || true
+ systemctl disable --now docker.service docker.socket || true
fi
ensure_umount "$MOUNTPATH/docker/aufs"
## runit
sv up docker.io
else
- service docker start
+ systemctl enable --now docker.service docker.socket
fi
end=$((SECONDS+60))
then
sv stop docker.io || service stop docker.io || true
else
- service docker stop || true
+ systemctl disable --now docker.service docker.socket || true
fi
ensure_umount "$MOUNTPATH/docker/aufs"
## runit
sv up docker.io
else
- service docker start
+ systemctl enable --now docker.service docker.socket || true
fi
end=$((SECONDS+60))
APIToken string
APIHost string
APIHostInsecure bool
- ExternalClient bool
}
// Load config from given file
config.APIHost = value
case "ARVADOS_API_HOST_INSECURE":
config.APIHostInsecure = arvadosclient.StringBool(value)
- case "ARVADOS_EXTERNAL_CLIENT":
- config.ExternalClient = arvadosclient.StringBool(value)
case "ARVADOS_BLOB_SIGNING_KEY":
blobSigningKey = value
}
ApiInsecure: config.APIHostInsecure,
Client: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
- External: config.ExternalClient,
}
// If keepServicesJSON is provided, use it instead of service discovery
fileContent += "ARVADOS_API_TOKEN=" + arvadostest.DataManagerToken + "\n"
fileContent += "\n"
fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
- fileContent += " ARVADOS_EXTERNAL_CLIENT = false \n"
fileContent += " NotANameValuePairAndShouldGetIgnored \n"
fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg\n"
c.Assert(config.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
c.Assert(config.APIToken, Equals, arvadostest.DataManagerToken)
c.Assert(config.APIHostInsecure, Equals, arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE")))
- c.Assert(config.ExternalClient, Equals, false)
c.Assert(blobSigningKey, Equals, "abcdefg")
}
APIToken string
APIHost string
APIHostInsecure bool
- ExternalClient bool
}
// Load src and dst config from given files
config.APIHost = value
case "ARVADOS_API_HOST_INSECURE":
config.APIHostInsecure = arvadosclient.StringBool(value)
- case "ARVADOS_EXTERNAL_CLIENT":
- config.ExternalClient = arvadosclient.StringBool(value)
case "ARVADOS_BLOB_SIGNING_KEY":
blobSigningKey = value
}
ApiInsecure: config.APIHostInsecure,
Client: &http.Client{Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
- External: config.ExternalClient,
}
// If keepServicesJSON is provided, use it instead of service discovery
c.Assert(srcConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
c.Assert(srcConfig.APIToken, Equals, arvadostest.SystemRootToken)
c.Assert(srcConfig.APIHostInsecure, Equals, arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE")))
- c.Assert(srcConfig.ExternalClient, Equals, false)
dstConfig, _, err := loadConfig(dstConfigFile)
c.Check(err, IsNil)
c.Assert(dstConfig.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
c.Assert(dstConfig.APIToken, Equals, arvadostest.SystemRootToken)
c.Assert(dstConfig.APIHostInsecure, Equals, arvadosclient.StringBool(os.Getenv("ARVADOS_API_HOST_INSECURE")))
- c.Assert(dstConfig.ExternalClient, Equals, false)
c.Assert(srcBlobSigningKey, Equals, "abcdefg")
}
fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
fileContent += "ARVADOS_API_TOKEN=" + arvadostest.SystemRootToken + "\n"
fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
- fileContent += "ARVADOS_EXTERNAL_CLIENT=false\n"
fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg"
_, err = file.Write([]byte(fileContent))
echo "Must be run from initialized setup dir, maybe you need to 'initialize' first?"
fi
source ${CONFIG_FILE}
- GITTARGET=arvados-deploy-config-${CLUSTER}
+ GITTARGET=.arvados-deploy-config-${CLUSTER}
}
subcmd="$1"