#
# SPDX-License-Identifier: AGPL-3.0
-LIBCLOUD_PIN=2.2.2.dev4
+LIBCLOUD_PIN=2.3.0
declare -a checkdirs=(vendor)
if grep -qr git.curoverse.com/arvados .; then
checkdirs+=(sdk/go)
+ if [[ "$prog" -eq "crunch-dispatch-slurm" ]]; then
+ checkdirs+=(lib/dispatchcloud)
+ fi
fi
for dir in ${checkdirs[@]}; do
cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
echo -n 'graphviz: '
dot -V || fatal "No graphviz. Try: apt-get install graphviz"
- # R SDK stuff
- echo -n 'R: '
- which R || fatal "No R. Try: apt-get install r-base"
- echo -n 'testthat: '
- R -q -e "library('testthat')" || fatal "No testthat. Try: apt-get install r-cran-testthat"
- # needed for roxygen2, needed for devtools, needed for R sdk
- pkg-config --exists libxml-2.0 || fatal "No libxml2. Try: apt-get install libxml2-dev"
- # needed for pkgdown, builds R SDK doc pages
- which pandoc || fatal "No pandoc. Try: apt-get install pandoc"
+ if [[ "$NEED_SDK_R" = true ]]; then
+ # R SDK stuff
+ echo -n 'R: '
+ which R || fatal "No R. Try: apt-get install r-base"
+ echo -n 'testthat: '
+ R -q -e "library('testthat')" || fatal "No testthat. Try: apt-get install r-cran-testthat"
+ # needed for roxygen2, needed for devtools, needed for R sdk
+ pkg-config --exists libxml-2.0 || fatal "No libxml2. Try: apt-get install libxml2-dev"
+ # needed for pkgdown, builds R SDK doc pages
+ which pandoc || fatal "No pandoc. Try: apt-get install pandoc"
+ fi
}
rotate_logfile() {
esac
done
+# R SDK installation is very slow (~360s in a clean environment) and only
+# required when testing it. Skip that step if it is not needed.
+NEED_SDK_R=true
+
+if [[ ! -z "${only}" && "${only}" != "sdk/R" ]]; then
+ NEED_SDK_R=false
+fi
+
+if [[ ! -z "${skip}" && "${skip}" == "sdk/R" ]]; then
+ NEED_SDK_R=false
+fi
+
start_services() {
echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
# Needed for run_test_server.py which is used by certain (non-Python) tests.
pip freeze 2>/dev/null | egrep ^PyYAML= \
- || pip install PyYAML >/dev/null \
+ || pip install --no-cache-dir PyYAML >/dev/null \
|| fatal "pip install PyYAML failed"
-# Preinstall forked version of libcloud, because nodemanager "pip install"
+# Preinstall libcloud, because nodemanager "pip install"
# won't pick it up by default.
pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
- || pip install --pre --ignore-installed https://github.com/curoverse/libcloud/archive/apache-libcloud-$LIBCLOUD_PIN.zip >/dev/null \
+ || pip install --pre --ignore-installed --no-cache-dir apache-libcloud>=$LIBCLOUD_PIN >/dev/null \
|| fatal "pip install apache-libcloud failed"
# We need an unreleased (as of 2017-08-17) llfuse bugfix, otherwise our fuse test suite deadlocks.
}
do_install() {
- if [[ -z "${only_install}" || "${only_install}" == "${1}" ]]; then
- retry do_install_once ${@}
- else
- title "Skipping $1 install"
- fi
+ skipit=false
+
+ if [[ -z "${only_install}" || "${only_install}" == "${1}" ]]; then
+ retry do_install_once ${@}
+ else
+ skipit=true
+ fi
+
+ if [[ "$skipit" = true ]]; then
+ title "Skipping $1 install"
+ fi
}
do_install_once() {
do_install sdk/ruby ruby_sdk
install_R_sdk() {
+ if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& R --quiet --vanilla --file=install_deps.R
+ fi
}
do_install sdk/R R_sdk
do_test sdk/ruby ruby_sdk
test_R_sdk() {
+ if [[ "$NEED_SDK_R" = true ]]; then
cd "$WORKSPACE/sdk/R" \
&& R --quiet --file=run_test.R
+ fi
}
+
do_test sdk/R R_sdk
test_cli() {
1. Only mount points of kind @collection@ are supported.
-2. Mount points underneath output_path which have "writable":true are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container. The original collection is not modified. On container completion, files remaining in the output are saved to the output collection. The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
+2. Mount points underneath output_path which have @"writable":true@ are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container. The original collection is not modified. On container completion, files remaining in the output are saved to the output collection. The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
3. If any such mount points are configured as @exclude_from_output":true@, they will be excluded from the output.
<notextile>
<pre><code class="userinput">Client:
- APIHost: <b>zzzzz.arvadosapi.com</b>
- AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
+ APIHost: zzzzz.arvadosapi.com
+ AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
KeepServiceURIs:
- <b>http://127.0.0.1:25107</b>
</code></pre>
crunch-dispatch-slurm polls the API server periodically for new containers to run. The @PollPeriod@ option controls how often this poll happens. Set this to a string of numbers suffixed with one of the time units @ns@, @us@, @ms@, @s@, @m@, or @h@. For example:
<notextile>
-<pre><code class="userinput">Client:
- APIHost: <b>zzzzz.arvadosapi.com</b>
- AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-PollPeriod: <b>3m30s</b>
+<pre><code class="userinput">PollPeriod: <b>3m30s</b>
</code></pre>
</notextile>
+h3. PrioritySpread
+
+crunch-dispatch-slurm adjusts the "nice" values of its SLURM jobs to ensure containers are prioritized correctly relative to one another. This option tunes the adjustment mechanism.
+* If non-Arvados jobs run on your SLURM cluster, and your Arvados containers are waiting too long in the SLURM queue because their "nice" values are too high for them to compete with other SLURM jobs, you should use a smaller PrioritySpread value.
+* If you have an older SLURM system that limits nice values to 10000, a smaller @PrioritySpread@ can help avoid reaching that limit.
+* In other cases, a larger value is beneficial because it reduces the total number of adjustments made by executing @scontrol@.
+
+The smallest usable value is @1@. The default value of @10@ is used if this option is zero or negative. Example:
+
+<notextile>
+<pre><code class="userinput">PrioritySpread: <b>1000</b>
+</code></pre>
+</notextile>
+
+
+
h3. SbatchArguments
When crunch-dispatch-slurm invokes @sbatch@, you can add switches to the command by specifying @SbatchArguments@. You can use this to send the jobs to specific cluster partitions or add resource requests. Set @SbatchArguments@ to an array of strings. For example:
<notextile>
-<pre><code class="userinput">Client:
- APIHost: <b>zzzzz.arvadosapi.com</b>
- AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-SbatchArguments:
+<pre><code class="userinput">SbatchArguments:
- <b>"--partition=PartitionName"</b>
</code></pre>
</notextile>
If your SLURM cluster uses the @task/cgroup@ TaskPlugin, you can configure Crunch's Docker containers to be dispatched inside SLURM's cgroups. This provides consistent enforcement of resource constraints. To do this, use a crunch-dispatch-slurm configuration like the following:
<notextile>
-<pre><code class="userinput">Client:
- APIHost: <b>zzzzz.arvadosapi.com</b>
- AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-CrunchRunCommand:
+<pre><code class="userinput">CrunchRunCommand:
- <b>crunch-run</b>
- <b>"-cgroup-parent-subsystem=memory"</b>
</code></pre>
Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups. This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net". If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster. Be aware this reduces container isolation, which may be a security risk.
<notextile>
-<pre><code class="userinput">Client:
- APIHost: <b>zzzzz.arvadosapi.com</b>
- AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-CrunchRunCommand:
+<pre><code class="userinput">CrunchRunCommand:
- <b>crunch-run</b>
- <b>"-container-enable-networking=always"</b>
- <b>"-container-network-mode=host"</b>
If SLURM is unable to run a container, the dispatcher will submit it again after the next PollPeriod. If PollPeriod is very short, this can be excessive. If MinRetryPeriod is set, the dispatcher will avoid submitting the same container to SLURM more than once in the given time span.
<notextile>
-<pre><code class="userinput">Client:
- APIHost: <b>zzzzz.arvadosapi.com</b>
- AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-MinRetryPeriod: <b>30s</b>
+<pre><code class="userinput">MinRetryPeriod: <b>30s</b>
</code></pre>
</notextile>
// it is no longer offered by any node. So, to make a feature name
// valid, we can add it to a dummy node ("compute0"), then remove it.
//
-// (2) when srun is given an invalid --gres argument and an invalid
-// --constraint argument, the error message mentions "Invalid feature
-// specification". So, to test whether a feature name is valid without
-// actually submitting a job, we can call srun with the feature name
-// and an invalid --gres argument.
+// (2) To test whether a set of feature names are valid without
+// actually submitting a job, we can call srun --test-only with the
+// desired features.
//
// SlurmNodeTypeFeatureKludge does a test-and-fix operation
// immediately, and then periodically, in case slurm restarts and
var (
slurmDummyNode = "compute0"
slurmErrBadFeature = "Invalid feature"
- slurmErrBadGres = "Invalid generic resource"
+ slurmErrNoNodes = "node configuration is not available"
)
func slurmKludge(features []string) {
- cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
+ cmd := exec.Command("srun", "--test-only", "--constraint="+strings.Join(features, "&"), "false")
out, err := cmd.CombinedOutput()
switch {
- case err == nil:
- log.Printf("warning: guaranteed-to-fail srun command did not fail: %q %q", cmd.Path, cmd.Args)
- log.Printf("output was: %q", out)
+ case err == nil || bytes.Contains(out, []byte(slurmErrNoNodes)):
+ // Evidently our node-type feature names are all valid.
case bytes.Contains(out, []byte(slurmErrBadFeature)):
log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
}
}
- case bytes.Contains(out, []byte(slurmErrBadGres)):
- // Evidently our node-type feature names are all valid.
-
default:
- log.Printf("warning: expected srun error %q or %q, but output was %q", slurmErrBadFeature, slurmErrBadGres, out)
+ log.Printf("warning: expected srun error %q, %q, or success, but output was %q", slurmErrBadFeature, slurmErrNoNodes, out)
}
}
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.draft2tool import compute_checksums
+from cwltool.command_line_tool import compute_checksums
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
return parser
def add_arv_hints():
- cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
- cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
+ cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+ cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
res.close()
if not images:
# Fetch Docker image if necessary.
try:
- cwltool.docker.get_image(dockerRequirement, pull_image)
+ cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image)
except OSError as e:
raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
from cwltool.process import get_feature, shortname, UnsupportedRequirement
from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, CommandLineTool
+from cwltool.command_line_tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
#
# SPDX-License-Identifier: Apache-2.0
-from cwltool.draft2tool import CommandLineTool
+from cwltool.command_line_tool import CommandLineTool
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
from cwltool.process import shortname
from cwltool.workflow import Workflow, WorkflowException
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.builder import Builder
import ruamel.yaml as yaml
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
+max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
+sum_res_pars = ("outdirMin", "outdirMax")
+
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
submit_runner_ram=0, name=None, merged_map=None):
dedup[r["class"]] = r
return [dedup[r] for r in sorted(dedup.keys())]
+def get_overall_res_req(res_reqs):
+ """Take the overall of a list of ResourceRequirement,
+ i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
+ and the sum of outdirMin, outdirMax."""
+
+ all_res_req = {}
+ exception_msgs = []
+ for a in max_res_pars + sum_res_pars:
+ all_res_req[a] = []
+ for res_req in res_reqs:
+ if a in res_req:
+ if isinstance(res_req[a], int): # integer check
+ all_res_req[a].append(res_req[a])
+ else:
+ msg = SourceLine(res_req).makeError(
+ "Non-top-level ResourceRequirement in single container cannot have expressions")
+ exception_msgs.append(msg)
+ if exception_msgs:
+ raise WorkflowException("\n".join(exception_msgs))
+ else:
+ overall_res_req = {}
+ for a in all_res_req:
+ if all_res_req[a]:
+ if a in max_res_pars:
+ overall_res_req[a] = max(all_res_req[a])
+ elif a in sum_res_pars:
+ overall_res_req[a] = sum(all_res_req[a])
+ if overall_res_req:
+ overall_res_req["class"] = "ResourceRequirement"
+ return cmap(overall_res_req)
+
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
packed = pack(document_loader, workflowobj, uri, self.metadata)
+ builder = Builder()
+ builder.job = joborder
+ builder.requirements = workflowobj["requirements"]
+ builder.hints = workflowobj["hints"]
+ builder.resources = {}
+
+ res_reqs = {"requirements": [], "hints": []}
+ for t in ("requirements", "hints"):
+ for item in packed["$graph"]:
+ if t in item:
+ if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints
+ for req in item[t]:
+ if req["class"] == "ResourceRequirement":
+ eval_req = {"class": "ResourceRequirement"}
+ for a in max_res_pars + sum_res_pars:
+ if a in req:
+ eval_req[a] = builder.do_eval(req[a])
+ res_reqs[t].append(eval_req)
+ else:
+ for req in item[t]:
+ if req["class"] == "ResourceRequirement":
+ res_reqs[t].append(req)
+ overall_res_req = {"requirements": get_overall_res_req(res_reqs["requirements"]),
+ "hints": get_overall_res_req(res_reqs["hints"])}
+
+ new_spec = {"requirements": self.requirements, "hints": self.hints}
+ for t in ("requirements", "hints"):
+ for req in new_spec[t]:
+ if req["class"] == "ResourceRequirement":
+ new_spec[t].remove(req)
+ if overall_res_req[t]:
+ new_spec[t].append(overall_res_req[t])
+
upload_dependencies(self.arvrunner,
kwargs.get("name", ""),
document_loader,
from schema_salad.sourceline import SourceLine
-import cwltool.draft2tool
-from cwltool.draft2tool import CommandLineTool
+from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180130110340',
+ 'cwltool==1.0.20180225105849',
'schema-salad==2.6.20171201034858',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
"outputs": [],
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
- "id": "#"
+ "id": "#",
+ "class": "CommandLineTool"
})
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"enableReuse": False
}],
"baseCommand": "ls",
- "id": "#"
+ "id": "#",
+ "class": "CommandLineTool"
})
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
} ]
}],
"baseCommand": "ls",
- "id": "#"
+ "id": "#",
+ "class": "CommandLineTool"
})
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"stderr": "stderr.txt",
"stdin": "/keep/99999999999999999999999999999996+99/file.txt",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
- "id": "#"
+ "id": "#",
+ "class": "CommandLineTool"
})
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"outputs": [],
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
- "id": "#"
+ "id": "#",
+ "class": "CommandLineTool"
})
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"outputs": [],
"baseCommand": "ls",
"arguments": [{"valueFrom": "$(runtime.outdir)"}],
- "id": "#"
+ "id": "#",
+ "class": "CommandLineTool"
})
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"enableReuse": False
}],
"baseCommand": "ls",
- "id": "#"
+ "id": "#",
+ "class": "CommandLineTool"
}
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"sleeptime": 5
}''')])
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("arvados.collection.Collection")
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+ def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = get_rootDesc()
+
+ runner = arvados_cwl.ArvCwlRunner(api)
+ self.assertEqual(runner.work_api, 'jobs')
+
+ list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+ runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
+
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.num_retries = 0
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
+ document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+ document_loader.fetch_text = document_loader.fetcher.fetch_text
+ document_loader.check_exists = document_loader.fetcher.check_exists
+
+ tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+ metadata["cwlVersion"] = tool["cwlVersion"]
+
+ mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
+ basedir="", make_fs_access=make_fs_access, loader=document_loader,
+ makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool.formatgraph = None
+ it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+ it.next().run()
+ it.next().run()
+
+ with open("tests/wf/echo-subwf.cwl") as f:
+ subwf = StripYAMLComments(f.read())
+
+ runner.api.jobs().create.assert_called_with(
+ body=JsonDiffMatcher({
+ 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'script': 'crunchrunner',
+ 'script_parameters': {
+ 'tasks': [{'task.env': {
+ 'HOME': '$(task.outdir)',
+ 'TMPDIR': '$(task.tmpdir)'},
+ 'task.vwd': {
+ 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
+ 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+ },
+ 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
+ 'task.stdout': 'cwl.output.json'}]},
+ 'runtime_constraints': {
+ 'min_scratch_mb_per_node': 4096,
+ 'min_cores_per_node': 3,
+ 'docker_image': 'arvados/jobs',
+ 'min_ram_mb_per_node': 1024
+ },
+ 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
+ filters=[['repository', '=', 'arvados'],
+ ['script', '=', 'crunchrunner'],
+ ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
+ ['docker_image_locator', 'in docker', 'arvados/jobs']],
+ find_or_create=True)
+
def test_default_work_api(self):
arvados_cwl.add_arv_hints()
@mock.patch("arvados.commands.keepdocker.find_one_image_hash")
- @mock.patch("cwltool.docker.get_image")
+ @mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
@mock.patch("arvados.api")
def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
arvrunner = mock.MagicMock()
--- /dev/null
+cwlVersion: v1.0
+class: Workflow
+requirements:
+ ResourceRequirement:
+ coresMin: 1
+
+inputs: []
+
+outputs: []
+
+steps:
+ echo_a:
+ run: echo_a.cwl
+ in: []
+ out: []
+ echo_b:
+ run: echo_b.cwl
+ in: []
+ out: []
--- /dev/null
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ SubworkflowFeatureRequirement: {}
+
+inputs: []
+
+outputs: []
+
+steps:
+ echo-subwf:
+ requirements:
+ arv:RunInSingleContainer: {}
+ run: echo-subwf.cwl
+ in: []
+ out: []
--- /dev/null
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+ ResourceRequirement:
+ coresMin: 2
+ outdirMin: 1024
+inputs: []
+outputs: []
+baseCommand: echo
+arguments:
+ - "a"
--- /dev/null
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+ ResourceRequirement:
+ coresMin: 3
+ outdirMin: 2048
+inputs: []
+outputs: []
+baseCommand: echo
+arguments:
+ - "b"
package arvados
+import "time"
+
// Container is an arvados#container resource.
type Container struct {
UUID string `json:"uuid"`
+ CreatedAt time.Time `json:"created_at"`
Command []string `json:"command"`
ContainerImage string `json:"container_image"`
Cwd string `json:"cwd"`
"lib/arvados/collection.rb", "lib/arvados/keep.rb",
"README", "LICENSE-2.0.txt"]
s.required_ruby_version = '>= 1.8.7'
- # activesupport <4.2.6 only because https://dev.arvados.org/issues/8222
- s.add_dependency('activesupport', '>= 3', '< 4.2.6')
+ s.add_dependency('activesupport', '>= 3')
s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
"github.com/coreos/go-systemd/daemon"
)
+const initialNiceValue int64 = 10000
+
var (
version = "dev"
defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
SbatchArguments []string
PollPeriod arvados.Duration
+ PrioritySpread int64
// crunch-run command to invoke. The container UUID will be
// appended. If nil, []string{"crunch-run"} will be used.
disp.slurm = &slurmCLI{}
disp.sqCheck = &SqueueChecker{
- Period: time.Duration(disp.PollPeriod),
- Slurm: disp.slurm,
+ Period: time.Duration(disp.PollPeriod),
+ PrioritySpread: disp.PrioritySpread,
+ Slurm: disp.slurm,
}
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
}
}
-func (disp *Dispatcher) niceness(priority int) int {
- if priority > 1000 {
- priority = 1000
- }
- if priority < 0 {
- priority = 0
- }
- // Niceness range 1-10000
- return (1000 - priority) * 10
-}
-
func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", initialNiceValue))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
disp.scancel(ctr)
} else {
- disp.renice(updated)
+ p := int64(updated.Priority)
+ if p <= 1000 {
+ // API is providing
+ // user-assigned priority. If
+ // ctrs have equal priority,
+ // run the older one first.
+ p = int64(p)<<50 - (updated.CreatedAt.UnixNano() >> 14)
+ }
+ disp.sqCheck.SetPriority(ctr.UUID, p)
}
}
}
}
-
func (disp *Dispatcher) scancel(ctr arvados.Container) {
disp.sqCheck.L.Lock()
err := disp.slurm.Cancel(ctr.UUID)
}
}
-func (disp *Dispatcher) renice(ctr arvados.Container) {
- nice := disp.niceness(ctr.Priority)
- oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
- if nice == oldnice || oldnice == -1 {
- return
- }
- log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
- disp.sqCheck.L.Lock()
- err := disp.slurm.Renice(ctr.UUID, nice)
- disp.sqCheck.L.Unlock()
-
- if err != nil {
- log.Printf("renice: %s", err)
- time.Sleep(time.Second)
- return
- }
- if disp.sqCheck.HasUUID(ctr.UUID) {
- log.Printf("container %s has arvados priority %d, slurm nice %d",
- ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
- }
-}
-
func (disp *Dispatcher) readConfig(path string) error {
err := config.LoadFile(disp, path)
if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
return exec.Command("echo", sf.queue)
}
-func (sf *slurmFake) Renice(name string, nice int) error {
+func (sf *slurmFake) Renice(name string, nice int64) error {
sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
return nil
}
s.disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
- PollPeriod: time.Duration(1) * time.Second,
+ PollPeriod: time.Second,
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
runContainer(disp, ctr)
}
func (s *IntegrationSuite) TestNormal(c *C) {
- s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100\n"}
container := s.integrationTest(c,
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
}
func (s *IntegrationSuite) TestCancel(c *C) {
- s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100\n"}
readyToCancel := make(chan bool)
s.slurm.onCancel = func() { <-readyToCancel }
container := s.integrationTest(c,
fmt.Sprintf("--mem=%d", 11445),
fmt.Sprintf("--cpus-per-task=%d", 4),
fmt.Sprintf("--tmp=%d", 45777),
- fmt.Sprintf("--nice=%d", 9990)}},
+ fmt.Sprintf("--nice=%d", 10000)}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
func (s *IntegrationSuite) TestSbatchFail(c *C) {
s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
container := s.integrationTest(c,
- [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
+ [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=10000"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
c.Assert(len(ll.Items), Equals, 1)
}
-func (s *IntegrationSuite) TestChangePriority(c *C) {
- s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
- container := s.integrationTest(c, nil,
- func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(time.Second)
- dispatcher.Arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"priority": 600}},
- nil)
- time.Sleep(time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
- })
- c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
- c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
-}
-
type StubbedSuite struct {
disp Dispatcher
}
ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
Arv: arv,
- PollPeriod: time.Duration(1) * time.Second,
+ PollPeriod: time.Second,
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
- time.Sleep(1 * time.Second)
+ time.Sleep(time.Second)
disp.UpdateState(ctr.UUID, dispatch.Running)
disp.UpdateState(ctr.UUID, dispatch.Complete)
}()
s.disp.SbatchArguments = defaults
args, err := s.disp.sbatchArgs(container)
- c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+ c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=10000"))
c.Check(err, IsNil)
}
}
args, err := s.disp.sbatchArgs(container)
c.Check(err, Equals, trial.err)
if trial.err == nil {
- c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+ c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=10000"}, trial.sbatchArgs...))
}
}
}
args, err := s.disp.sbatchArgs(container)
c.Check(args, DeepEquals, []string{
- "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+ "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=10000",
"--partition=blurb,b2",
})
c.Check(err, IsNil)
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+const defaultSpread int64 = 10
+
+// wantNice calculates appropriate nice values for a set of SLURM
+// jobs. The returned slice will have len(jobs) elements.
+//
+// spread is a positive amount of space to leave between adjacent
+// priorities when making adjustments. Generally, increasing spread
+// reduces the total number of adjustments made. A smaller spread
+// produces lower nice values, which is useful for old SLURM versions
+// with a limited "nice" range and for sites where SLURM is also
+// running non-Arvados jobs with low nice values.
+//
+// If spread<1, a sensible default (10) is used.
+func wantNice(jobs []*slurmJob, spread int64) []int64 {
+ if len(jobs) == 0 {
+ return nil
+ }
+
+ if spread < 1 {
+ spread = defaultSpread
+ }
+ renice := make([]int64, len(jobs))
+
+ // highest usable priority (without going out of order)
+ var target int64
+ for i, job := range jobs {
+ if i == 0 {
+ // renice[0] is always zero, so our highest
+ // priority container gets the highest
+ // possible slurm priority.
+ target = job.priority + job.nice
+ } else if space := target - job.priority; space >= 0 && space < (spread-1)*10 {
+ // Ordering is correct, and interval isn't too
+ // large. Leave existing nice value alone.
+ renice[i] = job.nice
+ target = job.priority
+ } else {
+ target -= (spread - 1)
+ if possible := job.priority + job.nice; target > possible {
+ // renice[i] is already 0, that's the
+ // best we can do
+ target = possible
+ } else {
+ renice[i] = possible - target
+ }
+ }
+ target--
+ }
+ return renice
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&PrioritySuite{})
+
+type PrioritySuite struct{}
+
+func (s *PrioritySuite) TestReniceCorrect(c *C) {
+ for _, test := range []struct {
+ spread int64
+ in []*slurmJob
+ out []int64
+ }{
+ {
+ 0,
+ nil,
+ nil,
+ },
+ {
+ 0,
+ []*slurmJob{},
+ nil,
+ },
+ {
+ 10,
+ []*slurmJob{{priority: 4294000111, nice: 10000}},
+ []int64{0},
+ },
+ {
+ 10,
+ []*slurmJob{
+ {priority: 4294000111, nice: 10000},
+ {priority: 4294000111, nice: 10000},
+ {priority: 4294000111, nice: 10000},
+ {priority: 4294000111, nice: 10000},
+ },
+ []int64{0, 10, 20, 30},
+ },
+ { // smaller spread than necessary, but correctly ordered => leave nice alone
+ 10,
+ []*slurmJob{
+ {priority: 4294000113, nice: 0},
+ {priority: 4294000112, nice: 1},
+ {priority: 4294000111, nice: 99},
+ },
+ []int64{0, 1, 99},
+ },
+ { // larger spread than necessary, but less than 10x => leave nice alone
+ 10,
+ []*slurmJob{
+ {priority: 4294000144, nice: 0},
+ {priority: 4294000122, nice: 20},
+ {priority: 4294000111, nice: 30},
+ },
+ []int64{0, 20, 30},
+ },
+ { // > 10x spread => reduce nice to achieve spread=10
+ 10,
+ []*slurmJob{
+ {priority: 4000, nice: 0}, // max pri 4000
+ {priority: 3000, nice: 999}, // max pri 3999
+ {priority: 2000, nice: 1998}, // max pri 3998
+ },
+ []int64{0, 9, 18},
+ },
+ { // > 10x spread, but spread=10 is impossible without negative nice
+ 10,
+ []*slurmJob{
+ {priority: 4000, nice: 0}, // max pri 4000
+ {priority: 3000, nice: 500}, // max pri 3500
+ {priority: 2000, nice: 2000}, // max pri 4000
+ },
+ []int64{0, 0, 510},
+ },
+ { // default spread, needs reorder
+ 0,
+ []*slurmJob{
+ {priority: 4000, nice: 0}, // max pri 4000
+ {priority: 5000, nice: 0}, // max pri 5000
+ {priority: 6000, nice: 0}, // max pri 6000
+ },
+ []int64{0, 1000 + defaultSpread, 2000 + defaultSpread*2},
+ },
+ { // minimum spread
+ 1,
+ []*slurmJob{
+ {priority: 4000, nice: 0}, // max pri 4000
+ {priority: 5000, nice: 0}, // max pri 5000
+ {priority: 6000, nice: 0}, // max pri 6000
+ {priority: 3000, nice: 0}, // max pri 3000
+ },
+ []int64{0, 1001, 2002, 0},
+ },
+ } {
+ c.Logf("spread=%d %+v -> %+v", test.spread, test.in, test.out)
+ c.Check(wantNice(test.in, test.spread), DeepEquals, test.out)
+
+ if len(test.in) == 0 {
+ continue
+ }
+ // After making the adjustments, calling wantNice
+ // again should return the same recommendations.
+ updated := make([]*slurmJob, len(test.in))
+ for i, in := range test.in {
+ updated[i] = &slurmJob{
+ nice: test.out[i],
+ priority: in.priority + in.nice - test.out[i],
+ }
+ }
+ c.Check(wantNice(updated, test.spread), DeepEquals, test.out)
+ }
+}
+
+func (s *PrioritySuite) TestReniceChurn(c *C) {
+ const spread = 10
+ jobs := make([]*slurmJob, 1000)
+ for i := range jobs {
+ jobs[i] = &slurmJob{priority: 4294000000 - int64(i), nice: 10000}
+ }
+ adjustments := 0
+ queue := jobs
+ for len(queue) > 0 {
+ renice := wantNice(queue, spread)
+ for i := range queue {
+ if renice[i] == queue[i].nice {
+ continue
+ }
+ queue[i].priority += queue[i].nice - renice[i]
+ queue[i].nice = renice[i]
+ adjustments++
+ }
+ queue = queue[1:]
+ }
+ c.Logf("processed queue of %d with %d renice ops", len(jobs), adjustments)
+ c.Check(adjustments < len(jobs)*len(jobs)/10, Equals, true)
+}
type Slurm interface {
Cancel(name string) error
- Renice(name string, nice int) error
+ Renice(name string, nice int64) error
QueueCommand(args []string) *exec.Cmd
Batch(script io.Reader, args []string) error
}
return exec.Command("squeue", args...)
}
-func (scli *slurmCLI) Renice(name string, nice int) error {
+func (scli *slurmCLI) Renice(name string, nice int64) error {
return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
}
"bytes"
"fmt"
"log"
+ "sort"
"strings"
"sync"
"time"
)
-type jobPriority struct {
- niceness int
- currentPriority int
+type slurmJob struct {
+ uuid string
+ wantPriority int64
+ priority int64 // current slurm priority (incorporates nice value)
+ nice int64 // current slurm nice value
}
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
- Period time.Duration
- Slurm Slurm
- uuids map[string]jobPriority
- startOnce sync.Once
- done chan struct{}
+ Period time.Duration
+ PrioritySpread int64
+ Slurm Slurm
+ queue map[string]*slurmJob
+ startOnce sync.Once
+ done chan struct{}
sync.Cond
}
// block until next squeue broadcast signaling an update.
sqc.Wait()
- _, exists := sqc.uuids[uuid]
+ _, exists := sqc.queue[uuid]
return exists
}
-// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
-func (sqc *SqueueChecker) GetNiceness(uuid string) int {
+// SetPriority sets or updates the desired (Arvados) priority for a
+// container.
+func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
sqc.startOnce.Do(sqc.start)
+ sqc.L.Lock()
+ defer sqc.L.Unlock()
+ job, ok := sqc.queue[uuid]
+ if !ok {
+ // Wait in case the slurm job was just submitted and
+ // will appear in the next squeue update.
+ sqc.Wait()
+ if job, ok = sqc.queue[uuid]; !ok {
+ return
+ }
+ }
+ job.wantPriority = want
+}
+// adjust slurm job nice values as needed to ensure slurm priority
+// order matches Arvados priority order.
+func (sqc *SqueueChecker) reniceAll() {
sqc.L.Lock()
defer sqc.L.Unlock()
- n, exists := sqc.uuids[uuid]
- if exists {
- return n.niceness
- } else {
- return -1
+ jobs := make([]*slurmJob, 0, len(sqc.queue))
+ for _, j := range sqc.queue {
+ if j.wantPriority == 0 {
+ // SLURM job with unknown Arvados priority
+ // (perhaps it's not an Arvados job)
+ continue
+ }
+ jobs = append(jobs, j)
+ }
+
+ sort.Slice(jobs, func(i, j int) bool {
+ return jobs[i].wantPriority > jobs[j].wantPriority
+ })
+ renice := wantNice(jobs, sqc.PrioritySpread)
+ for i, job := range jobs {
+ if renice[i] == job.nice {
+ continue
+ }
+ log.Printf("updating slurm priority for %q: nice %d => %d", job.uuid, job.nice, renice[i])
+ sqc.Slurm.Renice(job.uuid, renice[i])
}
}
}
// check gets the names of jobs in the SLURM queue (running and
-// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
// Mutex between squeue sync and running sbatch or scancel. This
}
lines := strings.Split(stdout.String(), "\n")
- sqc.uuids = make(map[string]jobPriority, len(lines))
+ newq := make(map[string]*slurmJob, len(lines))
for _, line := range lines {
+ if line == "" {
+ continue
+ }
var uuid string
- var nice int
- var prio int
- fmt.Sscan(line, &uuid, &nice, &prio)
- if uuid != "" {
- sqc.uuids[uuid] = jobPriority{nice, prio}
+ var n, p int64
+ if _, err := fmt.Sscan(line, &uuid, &n, &p); err != nil {
+ log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+ continue
+ }
+ replacing, ok := sqc.queue[uuid]
+ if !ok {
+ replacing = &slurmJob{uuid: uuid}
}
+ replacing.priority = p
+ replacing.nice = n
+ newq[uuid] = replacing
}
+ sqc.queue = newq
sqc.Broadcast()
}
return
case <-ticker.C:
sqc.check()
+ sqc.reniceAll()
}
}
}()
defer sqc.L.Unlock()
sqc.Wait()
var uuids []string
- for uuid := range sqc.uuids {
- uuids = append(uuids, uuid)
+ for u := range sqc.queue {
+ uuids = append(uuids, u)
}
return uuids
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "time"
+
+ . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&SqueueSuite{})
+
+type SqueueSuite struct{}
+
+func (s *SqueueSuite) TestReniceAll(c *C) {
+ uuids := []string{"zzzzz-dz642-fake0fake0fake0", "zzzzz-dz642-fake1fake1fake1", "zzzzz-dz642-fake2fake2fake2"}
+ for _, test := range []struct {
+ spread int64
+ squeue string
+ want map[string]int64
+ expect [][]string
+ }{
+ {
+ spread: 1,
+ squeue: uuids[0] + " 10000 4294000000\n",
+ want: map[string]int64{uuids[0]: 1},
+ expect: [][]string{{uuids[0], "0"}},
+ },
+ { // fake0 priority is too high
+ spread: 1,
+ squeue: uuids[0] + " 10000 4294000777\n" + uuids[1] + " 10000 4294000444\n",
+ want: map[string]int64{uuids[0]: 1, uuids[1]: 999},
+ expect: [][]string{{uuids[1], "0"}, {uuids[0], "334"}},
+ },
+ { // specify spread
+ spread: 100,
+ squeue: uuids[0] + " 10000 4294000777\n" + uuids[1] + " 10000 4294000444\n",
+ want: map[string]int64{uuids[0]: 1, uuids[1]: 999},
+ expect: [][]string{{uuids[1], "0"}, {uuids[0], "433"}},
+ },
+ { // ignore fake2 because SetPriority() not called
+ spread: 1,
+ squeue: uuids[0] + " 10000 4294000000\n" + uuids[1] + " 10000 4294000111\n" + uuids[2] + " 10000 4294000222\n",
+ want: map[string]int64{uuids[0]: 999, uuids[1]: 1},
+ expect: [][]string{{uuids[0], "0"}, {uuids[1], "112"}},
+ },
+ } {
+ c.Logf("spread=%d squeue=%q want=%v -> expect=%v", test.spread, test.squeue, test.want, test.expect)
+ slurm := &slurmFake{
+ queue: test.squeue,
+ }
+ sqc := &SqueueChecker{
+ Slurm: slurm,
+ PrioritySpread: test.spread,
+ Period: time.Hour,
+ }
+ sqc.startOnce.Do(sqc.start)
+ sqc.check()
+ for uuid, pri := range test.want {
+ sqc.SetPriority(uuid, pri)
+ }
+ sqc.reniceAll()
+ c.Check(slurm.didRenice, DeepEquals, test.expect)
+ sqc.Stop()
+ }
+}
+
+// If the given UUID isn't in the slurm queue yet, SetPriority()
+// should wait for it to appear on the very next poll, then give up.
+func (s *SqueueSuite) TestSetPriorityBeforeQueued(c *C) {
+ uuidGood := "zzzzz-dz642-fake0fake0fake0"
+ uuidBad := "zzzzz-dz642-fake1fake1fake1"
+
+ slurm := &slurmFake{}
+ sqc := &SqueueChecker{
+ Slurm: slurm,
+ Period: time.Hour,
+ }
+ sqc.startOnce.Do(sqc.start)
+ sqc.Stop()
+ sqc.check()
+
+ done := make(chan struct{})
+ go func() {
+ sqc.SetPriority(uuidGood, 123)
+ sqc.SetPriority(uuidBad, 345)
+ close(done)
+ }()
+ c.Check(sqc.queue[uuidGood], IsNil)
+ c.Check(sqc.queue[uuidBad], IsNil)
+ timeout := time.NewTimer(time.Second)
+ defer timeout.Stop()
+ tick := time.NewTicker(time.Millisecond)
+ defer tick.Stop()
+ for {
+ select {
+ case <-tick.C:
+ slurm.queue = uuidGood + " 0 12345\n"
+ sqc.check()
+
+ // Avoid immediately selecting this case again
+ // on the next iteration if check() took
+ // longer than one tick.
+ select {
+ case <-tick.C:
+ default:
+ }
+ case <-timeout.C:
+ c.Fatal("timed out")
+ case <-done:
+ c.Assert(sqc.queue[uuidGood], NotNil)
+ c.Check(sqc.queue[uuidGood].wantPriority, Equals, int64(123))
+ c.Check(sqc.queue[uuidBad], IsNil)
+ return
+ }
+ }
+}
('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
],
install_requires=[
- 'apache-libcloud>=2.2',
+ 'apache-libcloud>=2.3',
'arvados-python-client>=0.1.20170731145219',
'future',
'pykka',
'python-daemon',
'setuptools'
],
- dependency_links=[
- "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev4.zip"
- ],
test_suite='tests',
tests_require=[
'requests',
'pbr<1.7.0',
'mock>=1.0',
- 'apache-libcloud==2.2.2.dev4',
+ 'apache-libcloud>=2.3',
],
zip_safe=False,
cmdclass={'egg_info': tagger},