import cwltool.workflow
import cwltool.process
from schema_salad.sourceline import SourceLine
+import schema_salad.validate as validate
import arvados
import arvados.config
from cwltool.pack import pack
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
-from cwltool.draft2tool import compute_checksums
+from cwltool.command_line_tool import compute_checksums
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
'%(asctime)s %(name)s %(levelname)s: %(message)s',
'%Y-%m-%d %H:%M:%S'))
+DEFAULT_PRIORITY = 500
+
class ArvCwlRunner(object):
"""Execute a CWL tool or workflow, submit work (using either jobs or
containers API), wait for them to complete, and report output.
if not obj.get("dockerOutputDirectory").startswith('/'):
raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
"Option 'dockerOutputDirectory' must be an absolute path.")
+ if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
+ raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
collection_cache=self.collection_cache)
self.fs_access = make_fs_access(kwargs["basedir"])
-
+ self.secret_store = kwargs.get("secret_store")
self.trash_intermediate = kwargs["trash_intermediate"]
if self.trash_intermediate and self.work_api != "containers":
if self.work_api == "containers":
if self.ignore_docker_for_reuse:
- raise validate.ValidationException("--ignore-docker-for-reuse not supported with containers API.")
+ raise Exception("--ignore-docker-for-reuse not supported with containers API.")
kwargs["outdir"] = "/var/spool/cwl"
kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
kwargs["docker_tmpdir"] = "/tmp"
elif self.work_api == "jobs":
+ if kwargs["priority"] != DEFAULT_PRIORITY:
+ raise Exception("--priority not implemented for jobs API.")
kwargs["outdir"] = "$(task.outdir)"
kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
+ if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
+ raise Exception("--priority must be in the range 1..1000.")
+
runnerjob = None
if kwargs.get("submit"):
# Submit a runner job to run the workflow for us.
on_error=kwargs.get("on_error"),
submit_runner_image=kwargs.get("submit_runner_image"),
intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
- merged_map=merged_map)
+ merged_map=merged_map,
+ priority=kwargs.get("priority"),
+ secret_store=self.secret_store)
elif self.work_api == "jobs":
runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
self.output_name,
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--print-dot", action="store_true",
help="Print workflow visualization in graphviz format and exit")
- exgroup.add_argument("--version", action="store_true", help="Print version and exit")
+ exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
exgroup = parser.add_mutually_exclusive_group()
help="If N > 0, intermediate output collections will be trashed N seconds after creation. Default is 0 (don't trash).",
default=0)
+ parser.add_argument("--priority", type=int,
+ help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
+ default=DEFAULT_PRIORITY)
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
default=False, dest="trash_intermediate",
default=False, dest="trash_intermediate",
help="Do not trash intermediate outputs (default).")
- parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+ parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
return parser
def add_arv_hints():
- cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
- cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
+ cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
+ cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
res.close()
job_order_object = None
arvargs = parser.parse_args(args)
- if arvargs.version:
- print versionstring()
- return
-
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'