Merge branch 'master' of git.curoverse.com:arvados into 13076-r-autogen-api
authorFuad Muhic <fmuhic@capeannenterprises.com>
Tue, 6 Mar 2018 15:54:44 +0000 (16:54 +0100)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Tue, 6 Mar 2018 15:54:44 +0000 (16:54 +0100)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

30 files changed:
build/libcloud-pin.sh
build/run-library.sh
build/run-tests.sh
doc/_includes/_mount_types.liquid
doc/install/crunch2-slurm/install-dispatch.html.textile.liquid
lib/dispatchcloud/node_size.go
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvdocker.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/wf/echo-subwf.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/echo-wf.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/echo_a.cwl [new file with mode: 0644]
sdk/cwl/tests/wf/echo_b.cwl [new file with mode: 0644]
sdk/go/arvados/container.go
sdk/ruby/arvados.gemspec
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/priority.go [new file with mode: 0644]
services/crunch-dispatch-slurm/priority_test.go [new file with mode: 0644]
services/crunch-dispatch-slurm/slurm.go
services/crunch-dispatch-slurm/squeue.go
services/crunch-dispatch-slurm/squeue_test.go [new file with mode: 0644]
services/nodemanager/setup.py

index c795bb8a24b94c3aed4e551fa0143b1838945af4..63f65ada8b19382e3940199bc9ce7841fc2a14b2 100644 (file)
@@ -2,4 +2,4 @@
 #
 # SPDX-License-Identifier: AGPL-3.0
 
-LIBCLOUD_PIN=2.2.2.dev4
+LIBCLOUD_PIN=2.3.0
index c5312f8d8c519be54e3b4b06facde6049094f2b4..ba946882ee49c2a3beb840d841732e87f28bffe3 100755 (executable)
@@ -129,6 +129,9 @@ package_go_binary() {
     declare -a checkdirs=(vendor)
     if grep -qr git.curoverse.com/arvados .; then
         checkdirs+=(sdk/go)
+        if [[ "$prog" -eq "crunch-dispatch-slurm" ]]; then
+          checkdirs+=(lib/dispatchcloud)
+        fi
     fi
     for dir in ${checkdirs[@]}; do
         cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
index 48b3eab38ac864ab0c66d4a40d17502f60d4cdb3..9081d99bb545cac5ff14272e63e8f842292585da 100755 (executable)
@@ -243,15 +243,17 @@ sanity_checks() {
     echo -n 'graphviz: '
     dot -V || fatal "No graphviz. Try: apt-get install graphviz"
 
-    # R SDK stuff
-    echo -n 'R: '
-    which R || fatal "No R. Try: apt-get install r-base"
-    echo -n 'testthat: '
-    R -q -e "library('testthat')" || fatal "No testthat. Try: apt-get install r-cran-testthat"
-    # needed for roxygen2, needed for devtools, needed for R sdk
-    pkg-config --exists libxml-2.0 || fatal "No libxml2. Try: apt-get install libxml2-dev"
-    # needed for pkgdown, builds R SDK doc pages
-    which pandoc || fatal "No pandoc. Try: apt-get install pandoc"
+    if [[ "$NEED_SDK_R" = true ]]; then
+      # R SDK stuff
+      echo -n 'R: '
+      which R || fatal "No R. Try: apt-get install r-base"
+      echo -n 'testthat: '
+      R -q -e "library('testthat')" || fatal "No testthat. Try: apt-get install r-cran-testthat"
+      # needed for roxygen2, needed for devtools, needed for R sdk
+      pkg-config --exists libxml-2.0 || fatal "No libxml2. Try: apt-get install libxml2-dev"
+      # needed for pkgdown, builds R SDK doc pages
+      which pandoc || fatal "No pandoc. Try: apt-get install pandoc"
+    fi
 }
 
 rotate_logfile() {
@@ -321,6 +323,18 @@ do
     esac
 done
 
+# R SDK installation is very slow (~360s in a clean environment) and only
+# required when testing it. Skip that step if it is not needed.
+NEED_SDK_R=true
+
+if [[ ! -z "${only}" && "${only}" != "sdk/R" ]]; then
+  NEED_SDK_R=false
+fi
+
+if [[ ! -z "${skip}" && "${skip}" == "sdk/R" ]]; then
+  NEED_SDK_R=false
+fi
+
 start_services() {
     echo 'Starting API, keepproxy, keep-web, ws, arv-git-httpd, and nginx ssl proxy...'
     if [[ ! -d "$WORKSPACE/services/api/log" ]]; then
@@ -526,13 +540,13 @@ setup_virtualenv "$VENVDIR" --python python2.7
 
 # Needed for run_test_server.py which is used by certain (non-Python) tests.
 pip freeze 2>/dev/null | egrep ^PyYAML= \
-    || pip install PyYAML >/dev/null \
+    || pip install --no-cache-dir PyYAML >/dev/null \
     || fatal "pip install PyYAML failed"
 
-# Preinstall forked version of libcloud, because nodemanager "pip install"
+# Preinstall libcloud, because nodemanager "pip install"
 # won't pick it up by default.
 pip freeze 2>/dev/null | egrep ^apache-libcloud==$LIBCLOUD_PIN \
-    || pip install --pre --ignore-installed https://github.com/curoverse/libcloud/archive/apache-libcloud-$LIBCLOUD_PIN.zip >/dev/null \
+    || pip install --pre --ignore-installed --no-cache-dir apache-libcloud>=$LIBCLOUD_PIN >/dev/null \
     || fatal "pip install apache-libcloud failed"
 
 # We need an unreleased (as of 2017-08-17) llfuse bugfix, otherwise our fuse test suite deadlocks.
@@ -701,11 +715,17 @@ do_test_once() {
 }
 
 do_install() {
-    if [[ -z "${only_install}" || "${only_install}" == "${1}" ]]; then
-        retry do_install_once ${@}
-    else
-        title "Skipping $1 install"
-    fi
+  skipit=false
+
+  if [[ -z "${only_install}" || "${only_install}" == "${1}" ]]; then
+      retry do_install_once ${@}
+  else
+      skipit=true
+  fi
+
+  if [[ "$skipit" = true ]]; then
+    title "Skipping $1 install"
+  fi
 }
 
 do_install_once() {
@@ -780,8 +800,10 @@ install_ruby_sdk() {
 do_install sdk/ruby ruby_sdk
 
 install_R_sdk() {
+  if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
        && R --quiet --vanilla --file=install_deps.R
+  fi
 }
 do_install sdk/R R_sdk
 
@@ -957,9 +979,12 @@ test_ruby_sdk() {
 do_test sdk/ruby ruby_sdk
 
 test_R_sdk() {
+  if [[ "$NEED_SDK_R" = true ]]; then
     cd "$WORKSPACE/sdk/R" \
         && R --quiet --file=run_test.R
+  fi
 }
+
 do_test sdk/R R_sdk
 
 test_cli() {
index fc8a7991b38c65616704d0d096d7500a1b9e1473..edf8edfd4ab278ee6b134f3ba8dbe773487f3442 100644 (file)
@@ -64,7 +64,7 @@ When a container's output_path is a tmp mount backed by local disk, this output
 
 1. Only mount points of kind @collection@ are supported.
 
-2. Mount points underneath output_path which have "writable":true are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container.  The original collection is not modified.  On container completion, files remaining in the output are saved to the output collection.   The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
+2. Mount points underneath output_path which have @"writable":true@ are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container.  The original collection is not modified.  On container completion, files remaining in the output are saved to the output collection.   The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
 
 3. If any such mount points are configured as @exclude_from_output":true@, they will be excluded from the output.
 
index 27f15b103e10e12cdb7a83ec7f0bb3ee6e9e138f..9784266e68b4df509e0ce55f3db8118bbd77d99e 100644 (file)
@@ -69,8 +69,8 @@ Override Keep service discovery with a predefined list of Keep URIs. This can be
 
 <notextile>
 <pre><code class="userinput">Client:
-  APIHost: <b>zzzzz.arvadosapi.com</b>
-  AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
+  APIHost: zzzzz.arvadosapi.com
+  AuthToken: zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz
   KeepServiceURIs:
   - <b>http://127.0.0.1:25107</b>
 </code></pre>
@@ -81,22 +81,32 @@ h3. PollPeriod
 crunch-dispatch-slurm polls the API server periodically for new containers to run.  The @PollPeriod@ option controls how often this poll happens.  Set this to a string of numbers suffixed with one of the time units @ns@, @us@, @ms@, @s@, @m@, or @h@.  For example:
 
 <notextile>
-<pre><code class="userinput">Client:
-  APIHost: <b>zzzzz.arvadosapi.com</b>
-  AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-PollPeriod: <b>3m30s</b>
+<pre><code class="userinput">PollPeriod: <b>3m30s</b>
 </code></pre>
 </notextile>
 
+h3. PrioritySpread
+
+crunch-dispatch-slurm adjusts the "nice" values of its SLURM jobs to ensure containers are prioritized correctly relative to one another. This option tunes the adjustment mechanism.
+* If non-Arvados jobs run on your SLURM cluster, and your Arvados containers are waiting too long in the SLURM queue because their "nice" values are too high for them to compete with other SLURM jobs, you should use a smaller PrioritySpread value.
+* If you have an older SLURM system that limits nice values to 10000, a smaller @PrioritySpread@ can help avoid reaching that limit.
+* In other cases, a larger value is beneficial because it reduces the total number of adjustments made by executing @scontrol@.
+
+The smallest usable value is @1@. The default value of @10@ is used if this option is zero or negative. Example:
+
+<notextile>
+<pre><code class="userinput">PrioritySpread: <b>1000</b>
+</code></pre>
+</notextile>
+
+
+
 h3. SbatchArguments
 
 When crunch-dispatch-slurm invokes @sbatch@, you can add switches to the command by specifying @SbatchArguments@.  You can use this to send the jobs to specific cluster partitions or add resource requests.  Set @SbatchArguments@ to an array of strings.  For example:
 
 <notextile>
-<pre><code class="userinput">Client:
-  APIHost: <b>zzzzz.arvadosapi.com</b>
-  AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-SbatchArguments:
+<pre><code class="userinput">SbatchArguments:
 - <b>"--partition=PartitionName"</b>
 </code></pre>
 </notextile>
@@ -106,10 +116,7 @@ h3. CrunchRunCommand: Dispatch to SLURM cgroups
 If your SLURM cluster uses the @task/cgroup@ TaskPlugin, you can configure Crunch's Docker containers to be dispatched inside SLURM's cgroups.  This provides consistent enforcement of resource constraints.  To do this, use a crunch-dispatch-slurm configuration like the following:
 
 <notextile>
-<pre><code class="userinput">Client:
-  APIHost: <b>zzzzz.arvadosapi.com</b>
-  AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-CrunchRunCommand:
+<pre><code class="userinput">CrunchRunCommand:
 - <b>crunch-run</b>
 - <b>"-cgroup-parent-subsystem=memory"</b>
 </code></pre>
@@ -130,10 +137,7 @@ h3. CrunchRunCommand: Using host networking for containers
 Older Linux kernels (prior to 3.18) have bugs in network namespace handling which can lead to compute node lockups.  This by is indicated by blocked kernel tasks in "Workqueue: netns cleanup_net".   If you are experiencing this problem, as a workaround you can disable use of network namespaces by Docker across the cluster.  Be aware this reduces container isolation, which may be a security risk.
 
 <notextile>
-<pre><code class="userinput">Client:
-  APIHost: <b>zzzzz.arvadosapi.com</b>
-  AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-CrunchRunCommand:
+<pre><code class="userinput">CrunchRunCommand:
 - <b>crunch-run</b>
 - <b>"-container-enable-networking=always"</b>
 - <b>"-container-network-mode=host"</b>
@@ -145,10 +149,7 @@ h3. MinRetryPeriod: Rate-limit repeated attempts to start containers
 If SLURM is unable to run a container, the dispatcher will submit it again after the next PollPeriod. If PollPeriod is very short, this can be excessive. If MinRetryPeriod is set, the dispatcher will avoid submitting the same container to SLURM more than once in the given time span.
 
 <notextile>
-<pre><code class="userinput">Client:
-  APIHost: <b>zzzzz.arvadosapi.com</b>
-  AuthToken: <b>zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</b>
-MinRetryPeriod: <b>30s</b>
+<pre><code class="userinput">MinRetryPeriod: <b>30s</b>
 </code></pre>
 </notextile>
 
index 34f83a6efdeca89e485029025131baa1ff133ac9..41c6ff425121515101bdaf1d7aa5b4004acef57d 100644 (file)
@@ -69,11 +69,9 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
 // it is no longer offered by any node. So, to make a feature name
 // valid, we can add it to a dummy node ("compute0"), then remove it.
 //
-// (2) when srun is given an invalid --gres argument and an invalid
-// --constraint argument, the error message mentions "Invalid feature
-// specification". So, to test whether a feature name is valid without
-// actually submitting a job, we can call srun with the feature name
-// and an invalid --gres argument.
+// (2) To test whether a set of feature names are valid without
+// actually submitting a job, we can call srun --test-only with the
+// desired features.
 //
 // SlurmNodeTypeFeatureKludge does a test-and-fix operation
 // immediately, and then periodically, in case slurm restarts and
@@ -97,16 +95,15 @@ func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
 var (
        slurmDummyNode     = "compute0"
        slurmErrBadFeature = "Invalid feature"
-       slurmErrBadGres    = "Invalid generic resource"
+       slurmErrNoNodes    = "node configuration is not available"
 )
 
 func slurmKludge(features []string) {
-       cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
+       cmd := exec.Command("srun", "--test-only", "--constraint="+strings.Join(features, "&"), "false")
        out, err := cmd.CombinedOutput()
        switch {
-       case err == nil:
-               log.Printf("warning: guaranteed-to-fail srun command did not fail: %q %q", cmd.Path, cmd.Args)
-               log.Printf("output was: %q", out)
+       case err == nil || bytes.Contains(out, []byte(slurmErrNoNodes)):
+               // Evidently our node-type feature names are all valid.
 
        case bytes.Contains(out, []byte(slurmErrBadFeature)):
                log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
@@ -119,10 +116,7 @@ func slurmKludge(features []string) {
                        }
                }
 
-       case bytes.Contains(out, []byte(slurmErrBadGres)):
-               // Evidently our node-type feature names are all valid.
-
        default:
-               log.Printf("warning: expected srun error %q or %q, but output was %q", slurmErrBadFeature, slurmErrBadGres, out)
+               log.Printf("warning: expected srun error %q, %q, or success, but output was %q", slurmErrBadFeature, slurmErrNoNodes, out)
        }
 }
index 4701b4d8f13a29a2c1dc8f3bc5558de788a5a1fa..628b6aea69913da1944abea1d886a14d63db3d7a 100644 (file)
@@ -42,7 +42,7 @@ from ._version import __version__
 from cwltool.pack import pack
 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.draft2tool import compute_checksums
+from cwltool.command_line_tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -677,8 +677,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     return parser
 
 def add_arv_hints():
-    cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
-    cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
+    cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+    cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
     res.close()
index e59903f2dc33b3d69406ee1fbc92751bd78e9623..5483ccbf52b59f2d2133bacd76ad7eaaa775a91f 100644 (file)
@@ -45,7 +45,7 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         if not images:
             # Fetch Docker image if necessary.
             try:
-                cwltool.docker.get_image(dockerRequirement, pull_image)
+                cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image)
             except OSError as e:
                 raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
 
index 2731b2694422fcf8a986057266efe23354830c46..88155b5b958df8e0b7b04a602becccd772718bc3 100644 (file)
@@ -10,7 +10,7 @@ import time
 
 from cwltool.process import get_feature, shortname, UnsupportedRequirement
 from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, CommandLineTool
+from cwltool.command_line_tool import revmap_file, CommandLineTool
 from cwltool.load_tool import fetch_document
 from cwltool.builder import Builder
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
index 81faff44e6297476d28d84c7f78d7201dff29122..de329796e42384a18d4f4f669103c3fcb8a982a5 100644 (file)
@@ -2,7 +2,7 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-from cwltool.draft2tool import CommandLineTool
+from cwltool.command_line_tool import CommandLineTool
 from .arvjob import ArvadosJob
 from .arvcontainer import ArvadosContainer
 from .pathmapper import ArvPathMapper
index f0f9c77f40fed6c5a27d8160abdfc1d710ddb9f1..5aed871a12bc58d1a747efd2035dbf9d2a23b5a4 100644 (file)
@@ -14,6 +14,7 @@ from cwltool.load_tool import fetch_document
 from cwltool.process import shortname
 from cwltool.workflow import Workflow, WorkflowException
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
+from cwltool.builder import Builder
 
 import ruamel.yaml as yaml
 
@@ -26,6 +27,9 @@ from .perf import Perf
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
+max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
+sum_res_pars = ("outdirMin", "outdirMax")
+
 def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
                     submit_runner_ram=0, name=None, merged_map=None):
 
@@ -71,6 +75,37 @@ def dedup_reqs(reqs):
             dedup[r["class"]] = r
     return [dedup[r] for r in sorted(dedup.keys())]
 
+def get_overall_res_req(res_reqs):
+    """Take the overall of a list of ResourceRequirement,
+    i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
+    and the sum of outdirMin, outdirMax."""
+
+    all_res_req = {}
+    exception_msgs = []
+    for a in max_res_pars + sum_res_pars:
+        all_res_req[a] = []
+        for res_req in res_reqs:
+            if a in res_req:
+                if isinstance(res_req[a], int): # integer check
+                    all_res_req[a].append(res_req[a])
+                else:
+                    msg = SourceLine(res_req).makeError(
+                    "Non-top-level ResourceRequirement in single container cannot have expressions")
+                    exception_msgs.append(msg)
+    if exception_msgs:
+        raise WorkflowException("\n".join(exception_msgs))
+    else:
+        overall_res_req = {}
+        for a in all_res_req:
+            if all_res_req[a]:
+                if a in max_res_pars:
+                    overall_res_req[a] = max(all_res_req[a])
+                elif a in sum_res_pars:
+                    overall_res_req[a] = sum(all_res_req[a])
+        if overall_res_req:
+            overall_res_req["class"] = "ResourceRequirement"
+        return cmap(overall_res_req)
+
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
 
@@ -105,6 +140,39 @@ class ArvadosWorkflow(Workflow):
 
                     packed = pack(document_loader, workflowobj, uri, self.metadata)
 
+                    builder = Builder()
+                    builder.job = joborder
+                    builder.requirements = workflowobj["requirements"]
+                    builder.hints = workflowobj["hints"]
+                    builder.resources = {}
+
+                    res_reqs = {"requirements": [], "hints": []}
+                    for t in ("requirements", "hints"):
+                        for item in packed["$graph"]:
+                            if t in item:
+                                if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints
+                                    for req in item[t]:
+                                        if req["class"] == "ResourceRequirement":
+                                            eval_req = {"class": "ResourceRequirement"}
+                                            for a in max_res_pars + sum_res_pars:
+                                                if a in req:
+                                                    eval_req[a] = builder.do_eval(req[a])
+                                            res_reqs[t].append(eval_req)
+                                else:
+                                    for req in item[t]:
+                                        if req["class"] == "ResourceRequirement":
+                                            res_reqs[t].append(req)
+                    overall_res_req = {"requirements": get_overall_res_req(res_reqs["requirements"]),
+                                       "hints": get_overall_res_req(res_reqs["hints"])}
+
+                    new_spec = {"requirements": self.requirements, "hints": self.hints}
+                    for t in ("requirements", "hints"):
+                        for req in new_spec[t]:
+                            if req["class"] == "ResourceRequirement":
+                                new_spec[t].remove(req)
+                        if overall_res_req[t]:
+                            new_spec[t].append(overall_res_req[t])
+
                     upload_dependencies(self.arvrunner,
                                         kwargs.get("name", ""),
                                         document_loader,
index fb5d036e941969df71b6a3062d09bb87d4328739..9b79d458501dfd92e31584ae57aacbde6b513a7a 100644 (file)
@@ -13,8 +13,7 @@ from StringIO import StringIO
 
 from schema_salad.sourceline import SourceLine
 
-import cwltool.draft2tool
-from cwltool.draft2tool import CommandLineTool
+from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
 from cwltool.load_tool import fetch_document
index e5484651edcd44d0f2ef67c3bf0dbd1244ffca60..5b1d7370e8a2afcb692f568b34de67b443fe5dfd 100644 (file)
@@ -41,7 +41,7 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180130110340',
+          'cwltool==1.0.20180225105849',
           'schema-salad==2.6.20171201034858',
           'typing==3.5.3.0',
           'ruamel.yaml==0.13.7',
index c20693469ad2e32b9a3f437b2f75592d861530ec..fea21e9e0fb1d47c36e4cb25c93faab84f325148 100644 (file)
@@ -45,7 +45,8 @@ class TestContainer(unittest.TestCase):
                 "outputs": [],
                 "baseCommand": "ls",
                 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
-                "id": "#"
+                "id": "#",
+                "class": "CommandLineTool"
             })
             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
@@ -126,7 +127,8 @@ class TestContainer(unittest.TestCase):
                 "enableReuse": False
             }],
             "baseCommand": "ls",
-            "id": "#"
+            "id": "#",
+            "class": "CommandLineTool"
         })
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
@@ -234,7 +236,8 @@ class TestContainer(unittest.TestCase):
                 }                        ]
             }],
             "baseCommand": "ls",
-            "id": "#"
+            "id": "#",
+            "class": "CommandLineTool"
         })
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
@@ -333,7 +336,8 @@ class TestContainer(unittest.TestCase):
             "stderr": "stderr.txt",
             "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
             "arguments": [{"valueFrom": "$(runtime.outdir)"}],
-            "id": "#"
+            "id": "#",
+            "class": "CommandLineTool"
         })
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
@@ -454,7 +458,8 @@ class TestContainer(unittest.TestCase):
             "outputs": [],
             "baseCommand": "ls",
             "arguments": [{"valueFrom": "$(runtime.outdir)"}],
-            "id": "#"
+            "id": "#",
+            "class": "CommandLineTool"
         })
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
index 309ac0996ac54cb0cbda7c7940cad937cc8ee688..1dfd86b8c0f7cd6a5c51a414cbf8bc2335236e72 100644 (file)
@@ -53,7 +53,8 @@ class TestJob(unittest.TestCase):
                 "outputs": [],
                 "baseCommand": "ls",
                 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
-                "id": "#"
+                "id": "#",
+                "class": "CommandLineTool"
             })
             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
@@ -144,7 +145,8 @@ class TestJob(unittest.TestCase):
                 "enableReuse": False
             }],
             "baseCommand": "ls",
-            "id": "#"
+            "id": "#",
+            "class": "CommandLineTool"
         }
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
@@ -392,6 +394,81 @@ class TestWorkflow(unittest.TestCase):
   "sleeptime": 5
 }''')])
 
+    # The test passes no builder.resources
+    # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+    @mock.patch("arvados.collection.CollectionReader")
+    @mock.patch("arvados.collection.Collection")
+    @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+    def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+        arvados_cwl.add_arv_hints()
+
+        api = mock.MagicMock()
+        api._rootDesc = get_rootDesc()
+
+        runner = arvados_cwl.ArvCwlRunner(api)
+        self.assertEqual(runner.work_api, 'jobs')
+
+        list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+        runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
+        runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
+
+        runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+        runner.ignore_docker_for_reuse = False
+        runner.num_retries = 0
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+        document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
+        document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+        document_loader.fetch_text = document_loader.fetcher.fetch_text
+        document_loader.check_exists = document_loader.fetcher.check_exists
+
+        tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+        metadata["cwlVersion"] = tool["cwlVersion"]
+
+        mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+
+        arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
+                                              basedir="", make_fs_access=make_fs_access, loader=document_loader,
+                                              makeTool=runner.arv_make_tool, metadata=metadata)
+        arvtool.formatgraph = None
+        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+        it.next().run()
+        it.next().run()
+
+        with open("tests/wf/echo-subwf.cwl") as f:
+            subwf = StripYAMLComments(f.read())
+
+        runner.api.jobs().create.assert_called_with(
+            body=JsonDiffMatcher({
+                'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
+                'repository': 'arvados',
+                'script_version': 'master',
+                'script': 'crunchrunner',
+                'script_parameters': {
+                    'tasks': [{'task.env': {
+                        'HOME': '$(task.outdir)',
+                        'TMPDIR': '$(task.tmpdir)'},
+                               'task.vwd': {
+                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
+                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+                               },
+                    'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
+                    'task.stdout': 'cwl.output.json'}]},
+                'runtime_constraints': {
+                    'min_scratch_mb_per_node': 4096,
+                    'min_cores_per_node': 3,
+                    'docker_image': 'arvados/jobs',
+                    'min_ram_mb_per_node': 1024
+                },
+                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
+            filters=[['repository', '=', 'arvados'],
+                     ['script', '=', 'crunchrunner'],
+                     ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
+                     ['docker_image_locator', 'in docker', 'arvados/jobs']],
+            find_or_create=True)
+
     def test_default_work_api(self):
         arvados_cwl.add_arv_hints()
 
index 4ab5fb524c8427b21b64c8f1aa15dbbfe10b3cb8..c0b74fe4d3e1bd310afaa2b9bc906293c8c41498 100644 (file)
@@ -982,7 +982,7 @@ class TestSubmit(unittest.TestCase):
 
 
     @mock.patch("arvados.commands.keepdocker.find_one_image_hash")
-    @mock.patch("cwltool.docker.get_image")
+    @mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
     @mock.patch("arvados.api")
     def test_arvados_jobs_image(self, api, get_image, find_one_image_hash):
         arvrunner = mock.MagicMock()
diff --git a/sdk/cwl/tests/wf/echo-subwf.cwl b/sdk/cwl/tests/wf/echo-subwf.cwl
new file mode 100644 (file)
index 0000000..29dc3d6
--- /dev/null
@@ -0,0 +1,19 @@
+cwlVersion: v1.0
+class: Workflow
+requirements:
+  ResourceRequirement:
+    coresMin: 1
+
+inputs: []
+
+outputs: []
+
+steps:
+  echo_a:
+    run: echo_a.cwl
+    in: []
+    out: []
+  echo_b:
+    run: echo_b.cwl
+    in: []
+    out: []
diff --git a/sdk/cwl/tests/wf/echo-wf.cwl b/sdk/cwl/tests/wf/echo-wf.cwl
new file mode 100644 (file)
index 0000000..63a5438
--- /dev/null
@@ -0,0 +1,18 @@
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+  arv: "http://arvados.org/cwl#"
+requirements:
+  SubworkflowFeatureRequirement: {}
+
+inputs: []
+
+outputs: []
+
+steps:
+  echo-subwf:
+    requirements:
+      arv:RunInSingleContainer: {}
+    run: echo-subwf.cwl
+    in: []
+    out: []
diff --git a/sdk/cwl/tests/wf/echo_a.cwl b/sdk/cwl/tests/wf/echo_a.cwl
new file mode 100644 (file)
index 0000000..b7893e2
--- /dev/null
@@ -0,0 +1,11 @@
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+  ResourceRequirement:
+    coresMin: 2
+    outdirMin: 1024
+inputs: []
+outputs: []
+baseCommand: echo
+arguments:
+  - "a"
diff --git a/sdk/cwl/tests/wf/echo_b.cwl b/sdk/cwl/tests/wf/echo_b.cwl
new file mode 100644 (file)
index 0000000..4db11cc
--- /dev/null
@@ -0,0 +1,11 @@
+cwlVersion: v1.0
+class: CommandLineTool
+requirements:
+  ResourceRequirement:
+    coresMin: 3
+    outdirMin: 2048
+inputs: []
+outputs: []
+baseCommand: echo
+arguments:
+  - "b"
index 20d007c5c818daf3190b9852e840fd8a2f97d47e..daafc4995448524f7fe3794b9facd13e01480823 100644 (file)
@@ -4,9 +4,12 @@
 
 package arvados
 
+import "time"
+
 // Container is an arvados#container resource.
 type Container struct {
        UUID                 string               `json:"uuid"`
+       CreatedAt            time.Time            `json:"created_at"`
        Command              []string             `json:"command"`
        ContainerImage       string               `json:"container_image"`
        Cwd                  string               `json:"cwd"`
index a4ee65fa90b4bfc699f10b02736a7e5dfb8d9a61..c351189f81d5df59fbdf5a4dcfc6afd20a97a586 100644 (file)
@@ -23,8 +23,7 @@ Gem::Specification.new do |s|
                    "lib/arvados/collection.rb", "lib/arvados/keep.rb",
                    "README", "LICENSE-2.0.txt"]
   s.required_ruby_version = '>= 1.8.7'
-  # activesupport <4.2.6 only because https://dev.arvados.org/issues/8222
-  s.add_dependency('activesupport', '>= 3', '< 4.2.6')
+  s.add_dependency('activesupport', '>= 3')
   s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
   # Our google-api-client dependency used to be < 0.9, but that could be
   # satisfied by the buggy 0.9.pre*.  https://dev.arvados.org/issues/9213
index f77023697e0f54ccaa12e2e7bc1bf3dd39f71509..23e4b3a8cb456aac06f644c0219148967586fe71 100644 (file)
@@ -25,6 +25,8 @@ import (
        "github.com/coreos/go-systemd/daemon"
 )
 
+const initialNiceValue int64 = 10000
+
 var (
        version           = "dev"
        defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
@@ -40,6 +42,7 @@ type Dispatcher struct {
 
        SbatchArguments []string
        PollPeriod      arvados.Duration
+       PrioritySpread  int64
 
        // crunch-run command to invoke. The container UUID will be
        // appended. If nil, []string{"crunch-run"} will be used.
@@ -154,8 +157,9 @@ func (disp *Dispatcher) setup() {
 
        disp.slurm = &slurmCLI{}
        disp.sqCheck = &SqueueChecker{
-               Period: time.Duration(disp.PollPeriod),
-               Slurm:  disp.slurm,
+               Period:         time.Duration(disp.PollPeriod),
+               PrioritySpread: disp.PrioritySpread,
+               Slurm:          disp.slurm,
        }
        disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
@@ -198,17 +202,6 @@ func (disp *Dispatcher) checkSqueueForOrphans() {
        }
 }
 
-func (disp *Dispatcher) niceness(priority int) int {
-       if priority > 1000 {
-               priority = 1000
-       }
-       if priority < 0 {
-               priority = 0
-       }
-       // Niceness range 1-10000
-       return (1000 - priority) * 10
-}
-
 func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
 
@@ -226,7 +219,7 @@ func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
-       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", initialNiceValue))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
@@ -327,12 +320,19 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
                                log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                disp.scancel(ctr)
                        } else {
-                               disp.renice(updated)
+                               p := int64(updated.Priority)
+                               if p <= 1000 {
+                                       // API is providing
+                                       // user-assigned priority. If
+                                       // ctrs have equal priority,
+                                       // run the older one first.
+                                       p = int64(p)<<50 - (updated.CreatedAt.UnixNano() >> 14)
+                               }
+                               disp.sqCheck.SetPriority(ctr.UUID, p)
                        }
                }
        }
 }
-
 func (disp *Dispatcher) scancel(ctr arvados.Container) {
        disp.sqCheck.L.Lock()
        err := disp.slurm.Cancel(ctr.UUID)
@@ -347,28 +347,6 @@ func (disp *Dispatcher) scancel(ctr arvados.Container) {
        }
 }
 
-func (disp *Dispatcher) renice(ctr arvados.Container) {
-       nice := disp.niceness(ctr.Priority)
-       oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
-       if nice == oldnice || oldnice == -1 {
-               return
-       }
-       log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
-       disp.sqCheck.L.Lock()
-       err := disp.slurm.Renice(ctr.UUID, nice)
-       disp.sqCheck.L.Unlock()
-
-       if err != nil {
-               log.Printf("renice: %s", err)
-               time.Sleep(time.Second)
-               return
-       }
-       if disp.sqCheck.HasUUID(ctr.UUID) {
-               log.Printf("container %s has arvados priority %d, slurm nice %d",
-                       ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
-       }
-}
-
 func (disp *Dispatcher) readConfig(path string) error {
        err := config.LoadFile(disp, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
index 9fb5d6627eefa694181f4223d0e6dad17df45881..6852fc4be81978dd557870a2b629d5e56e16fc47 100644 (file)
@@ -74,7 +74,7 @@ func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
        return exec.Command("echo", sf.queue)
 }
 
-func (sf *slurmFake) Renice(name string, nice int) error {
+func (sf *slurmFake) Renice(name string, nice int64) error {
        sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
        return nil
 }
@@ -115,7 +115,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
 
        s.disp.Dispatcher = &dispatch.Dispatcher{
                Arv:        arv,
-               PollPeriod: time.Duration(1) * time.Second,
+               PollPeriod: time.Second,
                RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
                                runContainer(disp, ctr)
@@ -151,7 +151,7 @@ func (s *IntegrationSuite) integrationTest(c *C,
 }
 
 func (s *IntegrationSuite) TestNormal(c *C) {
-       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100\n"}
        container := s.integrationTest(c,
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
@@ -163,7 +163,7 @@ func (s *IntegrationSuite) TestNormal(c *C) {
 }
 
 func (s *IntegrationSuite) TestCancel(c *C) {
-       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 10000 100\n"}
        readyToCancel := make(chan bool)
        s.slurm.onCancel = func() { <-readyToCancel }
        container := s.integrationTest(c,
@@ -190,7 +190,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
                        fmt.Sprintf("--mem=%d", 11445),
                        fmt.Sprintf("--cpus-per-task=%d", 4),
                        fmt.Sprintf("--tmp=%d", 45777),
-                       fmt.Sprintf("--nice=%d", 9990)}},
+                       fmt.Sprintf("--nice=%d", 10000)}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
@@ -202,7 +202,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
 func (s *IntegrationSuite) TestSbatchFail(c *C) {
        s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
        container := s.integrationTest(c,
-               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
+               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=10000"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -220,24 +220,6 @@ func (s *IntegrationSuite) TestSbatchFail(c *C) {
        c.Assert(len(ll.Items), Equals, 1)
 }
 
-func (s *IntegrationSuite) TestChangePriority(c *C) {
-       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
-       container := s.integrationTest(c, nil,
-               func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
-                       dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(time.Second)
-                       dispatcher.Arv.Update("containers", container.UUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"priority": 600}},
-                               nil)
-                       time.Sleep(time.Second)
-                       dispatcher.UpdateState(container.UUID, dispatch.Complete)
-               })
-       c.Check(container.State, Equals, arvados.ContainerStateComplete)
-       c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
-       c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
-}
-
 type StubbedSuite struct {
        disp Dispatcher
 }
@@ -278,10 +260,10 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
        ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
                Arv:        arv,
-               PollPeriod: time.Duration(1) * time.Second,
+               PollPeriod: time.Second,
                RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
-                               time.Sleep(1 * time.Second)
+                               time.Sleep(time.Second)
                                disp.UpdateState(ctr.UUID, dispatch.Running)
                                disp.UpdateState(ctr.UUID, dispatch.Complete)
                        }()
@@ -364,7 +346,7 @@ func (s *StubbedSuite) TestSbatchArgs(c *C) {
                s.disp.SbatchArguments = defaults
 
                args, err := s.disp.sbatchArgs(container)
-               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=10000"))
                c.Check(err, IsNil)
        }
 }
@@ -410,7 +392,7 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
                args, err := s.disp.sbatchArgs(container)
                c.Check(err, Equals, trial.err)
                if trial.err == nil {
-                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=10000"}, trial.sbatchArgs...))
                }
        }
 }
@@ -425,7 +407,7 @@ func (s *StubbedSuite) TestSbatchPartition(c *C) {
 
        args, err := s.disp.sbatchArgs(container)
        c.Check(args, DeepEquals, []string{
-               "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+               "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=10000",
                "--partition=blurb,b2",
        })
        c.Check(err, IsNil)
diff --git a/services/crunch-dispatch-slurm/priority.go b/services/crunch-dispatch-slurm/priority.go
new file mode 100644 (file)
index 0000000..2312ce5
--- /dev/null
@@ -0,0 +1,56 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+const defaultSpread int64 = 10
+
+// wantNice calculates appropriate nice values for a set of SLURM
+// jobs. The returned slice will have len(jobs) elements.
+//
+// spread is a positive amount of space to leave between adjacent
+// priorities when making adjustments. Generally, increasing spread
+// reduces the total number of adjustments made. A smaller spread
+// produces lower nice values, which is useful for old SLURM versions
+// with a limited "nice" range and for sites where SLURM is also
+// running non-Arvados jobs with low nice values.
+//
+// If spread<1, a sensible default (10) is used.
+func wantNice(jobs []*slurmJob, spread int64) []int64 {
+       if len(jobs) == 0 {
+               return nil
+       }
+
+       if spread < 1 {
+               spread = defaultSpread
+       }
+       renice := make([]int64, len(jobs))
+
+       // highest usable priority (without going out of order)
+       var target int64
+       for i, job := range jobs {
+               if i == 0 {
+                       // renice[0] is always zero, so our highest
+                       // priority container gets the highest
+                       // possible slurm priority.
+                       target = job.priority + job.nice
+               } else if space := target - job.priority; space >= 0 && space < (spread-1)*10 {
+                       // Ordering is correct, and interval isn't too
+                       // large. Leave existing nice value alone.
+                       renice[i] = job.nice
+                       target = job.priority
+               } else {
+                       target -= (spread - 1)
+                       if possible := job.priority + job.nice; target > possible {
+                               // renice[i] is already 0, that's the
+                               // best we can do
+                               target = possible
+                       } else {
+                               renice[i] = possible - target
+                       }
+               }
+               target--
+       }
+       return renice
+}
diff --git a/services/crunch-dispatch-slurm/priority_test.go b/services/crunch-dispatch-slurm/priority_test.go
new file mode 100644 (file)
index 0000000..e80984c
--- /dev/null
@@ -0,0 +1,143 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&PrioritySuite{})
+
+type PrioritySuite struct{}
+
+func (s *PrioritySuite) TestReniceCorrect(c *C) {
+       for _, test := range []struct {
+               spread int64
+               in     []*slurmJob
+               out    []int64
+       }{
+               {
+                       0,
+                       nil,
+                       nil,
+               },
+               {
+                       0,
+                       []*slurmJob{},
+                       nil,
+               },
+               {
+                       10,
+                       []*slurmJob{{priority: 4294000111, nice: 10000}},
+                       []int64{0},
+               },
+               {
+                       10,
+                       []*slurmJob{
+                               {priority: 4294000111, nice: 10000},
+                               {priority: 4294000111, nice: 10000},
+                               {priority: 4294000111, nice: 10000},
+                               {priority: 4294000111, nice: 10000},
+                       },
+                       []int64{0, 10, 20, 30},
+               },
+               { // smaller spread than necessary, but correctly ordered => leave nice alone
+                       10,
+                       []*slurmJob{
+                               {priority: 4294000113, nice: 0},
+                               {priority: 4294000112, nice: 1},
+                               {priority: 4294000111, nice: 99},
+                       },
+                       []int64{0, 1, 99},
+               },
+               { // larger spread than necessary, but less than 10x => leave nice alone
+                       10,
+                       []*slurmJob{
+                               {priority: 4294000144, nice: 0},
+                               {priority: 4294000122, nice: 20},
+                               {priority: 4294000111, nice: 30},
+                       },
+                       []int64{0, 20, 30},
+               },
+               { // > 10x spread => reduce nice to achieve spread=10
+                       10,
+                       []*slurmJob{
+                               {priority: 4000, nice: 0},    // max pri 4000
+                               {priority: 3000, nice: 999},  // max pri 3999
+                               {priority: 2000, nice: 1998}, // max pri 3998
+                       },
+                       []int64{0, 9, 18},
+               },
+               { // > 10x spread, but spread=10 is impossible without negative nice
+                       10,
+                       []*slurmJob{
+                               {priority: 4000, nice: 0},    // max pri 4000
+                               {priority: 3000, nice: 500},  // max pri 3500
+                               {priority: 2000, nice: 2000}, // max pri 4000
+                       },
+                       []int64{0, 0, 510},
+               },
+               { // default spread, needs reorder
+                       0,
+                       []*slurmJob{
+                               {priority: 4000, nice: 0}, // max pri 4000
+                               {priority: 5000, nice: 0}, // max pri 5000
+                               {priority: 6000, nice: 0}, // max pri 6000
+                       },
+                       []int64{0, 1000 + defaultSpread, 2000 + defaultSpread*2},
+               },
+               { // minimum spread
+                       1,
+                       []*slurmJob{
+                               {priority: 4000, nice: 0}, // max pri 4000
+                               {priority: 5000, nice: 0}, // max pri 5000
+                               {priority: 6000, nice: 0}, // max pri 6000
+                               {priority: 3000, nice: 0}, // max pri 3000
+                       },
+                       []int64{0, 1001, 2002, 0},
+               },
+       } {
+               c.Logf("spread=%d %+v -> %+v", test.spread, test.in, test.out)
+               c.Check(wantNice(test.in, test.spread), DeepEquals, test.out)
+
+               if len(test.in) == 0 {
+                       continue
+               }
+               // After making the adjustments, calling wantNice
+               // again should return the same recommendations.
+               updated := make([]*slurmJob, len(test.in))
+               for i, in := range test.in {
+                       updated[i] = &slurmJob{
+                               nice:     test.out[i],
+                               priority: in.priority + in.nice - test.out[i],
+                       }
+               }
+               c.Check(wantNice(updated, test.spread), DeepEquals, test.out)
+       }
+}
+
+func (s *PrioritySuite) TestReniceChurn(c *C) {
+       const spread = 10
+       jobs := make([]*slurmJob, 1000)
+       for i := range jobs {
+               jobs[i] = &slurmJob{priority: 4294000000 - int64(i), nice: 10000}
+       }
+       adjustments := 0
+       queue := jobs
+       for len(queue) > 0 {
+               renice := wantNice(queue, spread)
+               for i := range queue {
+                       if renice[i] == queue[i].nice {
+                               continue
+                       }
+                       queue[i].priority += queue[i].nice - renice[i]
+                       queue[i].nice = renice[i]
+                       adjustments++
+               }
+               queue = queue[1:]
+       }
+       c.Logf("processed queue of %d with %d renice ops", len(jobs), adjustments)
+       c.Check(adjustments < len(jobs)*len(jobs)/10, Equals, true)
+}
index bd193778b38c2172b13987945cfd2df1c58e22ce..735e057e2542c42cf52940fcd3279e3b82ae28fe 100644 (file)
@@ -14,7 +14,7 @@ import (
 
 type Slurm interface {
        Cancel(name string) error
-       Renice(name string, nice int) error
+       Renice(name string, nice int64) error
        QueueCommand(args []string) *exec.Cmd
        Batch(script io.Reader, args []string) error
 }
@@ -54,7 +54,7 @@ func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
        return exec.Command("squeue", args...)
 }
 
-func (scli *slurmCLI) Renice(name string, nice int) error {
+func (scli *slurmCLI) Renice(name string, nice int64) error {
        return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
 }
 
index adb620ea8d34778f9d3c4d32edd42f518867b5a4..b8e3108c7c1a235e7c7e28ffc10974652a35cc6a 100644 (file)
@@ -8,24 +8,28 @@ import (
        "bytes"
        "fmt"
        "log"
+       "sort"
        "strings"
        "sync"
        "time"
 )
 
-type jobPriority struct {
-       niceness        int
-       currentPriority int
+type slurmJob struct {
+       uuid         string
+       wantPriority int64
+       priority     int64 // current slurm priority (incorporates nice value)
+       nice         int64 // current slurm nice value
 }
 
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
 type SqueueChecker struct {
-       Period    time.Duration
-       Slurm     Slurm
-       uuids     map[string]jobPriority
-       startOnce sync.Once
-       done      chan struct{}
+       Period         time.Duration
+       PrioritySpread int64
+       Slurm          Slurm
+       queue          map[string]*slurmJob
+       startOnce      sync.Once
+       done           chan struct{}
        sync.Cond
 }
 
@@ -40,22 +44,54 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool {
 
        // block until next squeue broadcast signaling an update.
        sqc.Wait()
-       _, exists := sqc.uuids[uuid]
+       _, exists := sqc.queue[uuid]
        return exists
 }
 
-// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
-func (sqc *SqueueChecker) GetNiceness(uuid string) int {
+// SetPriority sets or updates the desired (Arvados) priority for a
+// container.
+func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
        sqc.startOnce.Do(sqc.start)
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+       job, ok := sqc.queue[uuid]
+       if !ok {
+               // Wait in case the slurm job was just submitted and
+               // will appear in the next squeue update.
+               sqc.Wait()
+               if job, ok = sqc.queue[uuid]; !ok {
+                       return
+               }
+       }
+       job.wantPriority = want
+}
 
+// adjust slurm job nice values as needed to ensure slurm priority
+// order matches Arvados priority order.
+func (sqc *SqueueChecker) reniceAll() {
        sqc.L.Lock()
        defer sqc.L.Unlock()
 
-       n, exists := sqc.uuids[uuid]
-       if exists {
-               return n.niceness
-       } else {
-               return -1
+       jobs := make([]*slurmJob, 0, len(sqc.queue))
+       for _, j := range sqc.queue {
+               if j.wantPriority == 0 {
+                       // SLURM job with unknown Arvados priority
+                       // (perhaps it's not an Arvados job)
+                       continue
+               }
+               jobs = append(jobs, j)
+       }
+
+       sort.Slice(jobs, func(i, j int) bool {
+               return jobs[i].wantPriority > jobs[j].wantPriority
+       })
+       renice := wantNice(jobs, sqc.PrioritySpread)
+       for i, job := range jobs {
+               if renice[i] == job.nice {
+                       continue
+               }
+               log.Printf("updating slurm priority for %q: nice %d => %d", job.uuid, job.nice, renice[i])
+               sqc.Slurm.Renice(job.uuid, renice[i])
        }
 }
 
@@ -68,7 +104,7 @@ func (sqc *SqueueChecker) Stop() {
 }
 
 // check gets the names of jobs in the SLURM queue (running and
-// queued). If it succeeds, it updates squeue.uuids and wakes up any
+// queued). If it succeeds, it updates sqc.queue and wakes up any
 // goroutines that are waiting in HasUUID() or All().
 func (sqc *SqueueChecker) check() {
        // Mutex between squeue sync and running sbatch or scancel.  This
@@ -87,16 +123,26 @@ func (sqc *SqueueChecker) check() {
        }
 
        lines := strings.Split(stdout.String(), "\n")
-       sqc.uuids = make(map[string]jobPriority, len(lines))
+       newq := make(map[string]*slurmJob, len(lines))
        for _, line := range lines {
+               if line == "" {
+                       continue
+               }
                var uuid string
-               var nice int
-               var prio int
-               fmt.Sscan(line, &uuid, &nice, &prio)
-               if uuid != "" {
-                       sqc.uuids[uuid] = jobPriority{nice, prio}
+               var n, p int64
+               if _, err := fmt.Sscan(line, &uuid, &n, &p); err != nil {
+                       log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+                       continue
+               }
+               replacing, ok := sqc.queue[uuid]
+               if !ok {
+                       replacing = &slurmJob{uuid: uuid}
                }
+               replacing.priority = p
+               replacing.nice = n
+               newq[uuid] = replacing
        }
+       sqc.queue = newq
        sqc.Broadcast()
 }
 
@@ -114,6 +160,7 @@ func (sqc *SqueueChecker) start() {
                                return
                        case <-ticker.C:
                                sqc.check()
+                               sqc.reniceAll()
                        }
                }
        }()
@@ -127,8 +174,8 @@ func (sqc *SqueueChecker) All() []string {
        defer sqc.L.Unlock()
        sqc.Wait()
        var uuids []string
-       for uuid := range sqc.uuids {
-               uuids = append(uuids, uuid)
+       for u := range sqc.queue {
+               uuids = append(uuids, u)
        }
        return uuids
 }
diff --git a/services/crunch-dispatch-slurm/squeue_test.go b/services/crunch-dispatch-slurm/squeue_test.go
new file mode 100644 (file)
index 0000000..694a4d6
--- /dev/null
@@ -0,0 +1,119 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "time"
+
+       . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&SqueueSuite{})
+
+type SqueueSuite struct{}
+
+func (s *SqueueSuite) TestReniceAll(c *C) {
+       uuids := []string{"zzzzz-dz642-fake0fake0fake0", "zzzzz-dz642-fake1fake1fake1", "zzzzz-dz642-fake2fake2fake2"}
+       for _, test := range []struct {
+               spread int64
+               squeue string
+               want   map[string]int64
+               expect [][]string
+       }{
+               {
+                       spread: 1,
+                       squeue: uuids[0] + " 10000 4294000000\n",
+                       want:   map[string]int64{uuids[0]: 1},
+                       expect: [][]string{{uuids[0], "0"}},
+               },
+               { // fake0 priority is too high
+                       spread: 1,
+                       squeue: uuids[0] + " 10000 4294000777\n" + uuids[1] + " 10000 4294000444\n",
+                       want:   map[string]int64{uuids[0]: 1, uuids[1]: 999},
+                       expect: [][]string{{uuids[1], "0"}, {uuids[0], "334"}},
+               },
+               { // specify spread
+                       spread: 100,
+                       squeue: uuids[0] + " 10000 4294000777\n" + uuids[1] + " 10000 4294000444\n",
+                       want:   map[string]int64{uuids[0]: 1, uuids[1]: 999},
+                       expect: [][]string{{uuids[1], "0"}, {uuids[0], "433"}},
+               },
+               { // ignore fake2 because SetPriority() not called
+                       spread: 1,
+                       squeue: uuids[0] + " 10000 4294000000\n" + uuids[1] + " 10000 4294000111\n" + uuids[2] + " 10000 4294000222\n",
+                       want:   map[string]int64{uuids[0]: 999, uuids[1]: 1},
+                       expect: [][]string{{uuids[0], "0"}, {uuids[1], "112"}},
+               },
+       } {
+               c.Logf("spread=%d squeue=%q want=%v -> expect=%v", test.spread, test.squeue, test.want, test.expect)
+               slurm := &slurmFake{
+                       queue: test.squeue,
+               }
+               sqc := &SqueueChecker{
+                       Slurm:          slurm,
+                       PrioritySpread: test.spread,
+                       Period:         time.Hour,
+               }
+               sqc.startOnce.Do(sqc.start)
+               sqc.check()
+               for uuid, pri := range test.want {
+                       sqc.SetPriority(uuid, pri)
+               }
+               sqc.reniceAll()
+               c.Check(slurm.didRenice, DeepEquals, test.expect)
+               sqc.Stop()
+       }
+}
+
+// If the given UUID isn't in the slurm queue yet, SetPriority()
+// should wait for it to appear on the very next poll, then give up.
+func (s *SqueueSuite) TestSetPriorityBeforeQueued(c *C) {
+       uuidGood := "zzzzz-dz642-fake0fake0fake0"
+       uuidBad := "zzzzz-dz642-fake1fake1fake1"
+
+       slurm := &slurmFake{}
+       sqc := &SqueueChecker{
+               Slurm:  slurm,
+               Period: time.Hour,
+       }
+       sqc.startOnce.Do(sqc.start)
+       sqc.Stop()
+       sqc.check()
+
+       done := make(chan struct{})
+       go func() {
+               sqc.SetPriority(uuidGood, 123)
+               sqc.SetPriority(uuidBad, 345)
+               close(done)
+       }()
+       c.Check(sqc.queue[uuidGood], IsNil)
+       c.Check(sqc.queue[uuidBad], IsNil)
+       timeout := time.NewTimer(time.Second)
+       defer timeout.Stop()
+       tick := time.NewTicker(time.Millisecond)
+       defer tick.Stop()
+       for {
+               select {
+               case <-tick.C:
+                       slurm.queue = uuidGood + " 0 12345\n"
+                       sqc.check()
+
+                       // Avoid immediately selecting this case again
+                       // on the next iteration if check() took
+                       // longer than one tick.
+                       select {
+                       case <-tick.C:
+                       default:
+                       }
+               case <-timeout.C:
+                       c.Fatal("timed out")
+               case <-done:
+                       c.Assert(sqc.queue[uuidGood], NotNil)
+                       c.Check(sqc.queue[uuidGood].wantPriority, Equals, int64(123))
+                       c.Check(sqc.queue[uuidBad], IsNil)
+                       return
+               }
+       }
+}
index 7b023570b3d221593b8973eb4e640bda87a29dcb..454c24edbf115aa594b3e89cf810a89d2dbf7fdc 100644 (file)
@@ -36,22 +36,19 @@ setup(name='arvados-node-manager',
           ('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
       ],
       install_requires=[
-          'apache-libcloud>=2.2',
+          'apache-libcloud>=2.3',
           'arvados-python-client>=0.1.20170731145219',
           'future',
           'pykka',
           'python-daemon',
           'setuptools'
       ],
-      dependency_links=[
-          "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.2.2.dev4.zip"
-      ],
       test_suite='tests',
       tests_require=[
           'requests',
           'pbr<1.7.0',
           'mock>=1.0',
-          'apache-libcloud==2.2.2.dev4',
+          'apache-libcloud>=2.3',
       ],
       zip_safe=False,
       cmdclass={'egg_info': tagger},