import cwltool.workflow
import cwltool.process
from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
import arvados
import arvados.config
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.draft2tool import compute_checksums
+from cwltool.command_line_tool import compute_checksums
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
'%(asctime)s %(name)s %(levelname)s: %(message)s',
'%Y-%m-%d %H:%M:%S'))
+DEFAULT_PRIORITY = 500
+
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
def check_features(self, obj):
if isinstance(obj, dict):
- #if obj.get("writable"):
- # raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+ if obj.get("writable") and self.work_api != "containers":
+ raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
- # TODO: can be supported by containers API, but not jobs API.
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+ if self.work_api != "containers":
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+ if not obj.get("dockerOutputDirectory").startswith('/'):
+ raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+ "Option 'dockerOutputDirectory' must be an absolute path.")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"):
+ for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
if k in fileobj:
del fileobj[k]
"success")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+ self.eval_timeout = kwargs.get("eval_timeout")
kwargs["make_fs_access"] = make_fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
kwargs["compute_checksum"] = kwargs.get("compute_checksum")
if self.work_api == "containers":
+ if self.ignore_docker_for_reuse:
+ raise Exception("--ignore-docker-for-reuse not supported with containers API.")
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
kwargs["docker_tmpdir"] = "/tmp"
elif self.work_api == "jobs":
+ if kwargs["priority"] != DEFAULT_PRIORITY:
+ raise Exception("--priority not implemented for jobs API.")
kwargs["outdir"] = "$(task.outdir)"
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
+ if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+ raise Exception("--priority must be in the range 1..1000.")
+
runnerjob = None
if kwargs.get("submit"):
# Submit a runner job to run the workflow for us.
on_error=kwargs.get("on_error"),
submit_runner_image=kwargs.get("submit_runner_image"),
intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
- merged_map=merged_map)
+ merged_map=merged_map,
+ priority=kwargs.get("priority"))
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
self.output_name,
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--print-dot", action="store_true",
help="Print workflow visualization in graphviz format and exit")
- exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+ exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
exgroup = parser.add_mutually_exclusive_group()
help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
default=0)
+ parser.add_argument("--priority", type=int,
+ help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+ default=DEFAULT_PRIORITY)
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
default=False, dest="trash_intermediate",
default=False, dest="trash_intermediate",
help="Do not trash intermediate outputs (default).")
- parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+ parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
return parser
def add_arv_hints():
- cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
- cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
+ cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+ cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
res.close()
job_order_object = None
arvargs = parser.parse_args(args)
- if arvargs.version:
- print versionstring()
- return
-
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'