#!/usr/bin/env python
-# Implement cwl-runner interface for submitting and running jobs on Arvados.
+# Implement cwl-runner interface for submitting and running work on Arvados, using
+# either the Crunch jobs API or Crunch containers API.
import argparse
-import arvados
-import arvados.collection
-import arvados.commands.keepdocker
-import arvados.commands.run
-import arvados.events
-import arvados.util
-import copy
-import cwltool.docker
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
-from cwltool.errors import WorkflowException
-import cwltool.main
-import cwltool.workflow
-import fnmatch
-from functools import partial
-import json
import logging
import os
-import pkg_resources # part of setuptools
-import re
import sys
import threading
-from cwltool.load_tool import fetch_document
-from cwltool.builder import Builder
-import urlparse
-from .arvcontainer import ArvadosContainer
-from .arvdocker import arv_docker_get_image
+import hashlib
+import copy
+import json
+from functools import partial
+import pkg_resources # part of setuptools
+
+from cwltool.errors import WorkflowException
+import cwltool.main
+import cwltool.workflow
+import cwltool.process
+import schema_salad
+from schema_salad.sourceline import SourceLine
-from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
+import arvados
+import arvados.config
+from arvados.keep import KeepClient
+from arvados.errors import ApiError
+
+from .arvcontainer import ArvadosContainer, RunnerContainer
+from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from .arvtool import ArvadosCommandTool
+from .arvworkflow import ArvadosWorkflow, upload_workflow
+from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
+from .perf import Perf
+from .pathmapper import FinalOutputPathMapper
+from ._version import __version__
+
+from cwltool.pack import pack
+from cwltool.process import shortname, UnsupportedRequirement, getListing
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+from cwltool.draft2tool import compute_checksums
from arvados.api import OrderedJsonModel
logger = logging.getLogger('arvados.cwl-runner')
+metrics = logging.getLogger('arvados.cwl-runner.metrics')
logger.setLevel(logging.INFO)
-tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
-
-
-
-
-class CollectionFsAccess(cwltool.process.StdFsAccess):
- """Implement the cwltool FsAccess interface for Arvados Collections."""
-
- def __init__(self, basedir):
- super(CollectionFsAccess, self).__init__(basedir)
- self.collections = {}
-
- def get_collection(self, path):
- p = path.split("/")
- if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
- pdh = p[0][5:]
- if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh)
- return (self.collections[pdh], "/".join(p[1:]))
- else:
- return (None, path)
-
- def _match(self, collection, patternsegments, parent):
- if not patternsegments:
- return []
-
- if not isinstance(collection, arvados.collection.RichCollectionBase):
- return []
-
- ret = []
- # iterate over the files and subcollections in 'collection'
- for filename in collection:
- if patternsegments[0] == '.':
- # Pattern contains something like "./foo" so just shift
- # past the "./"
- ret.extend(self._match(collection, patternsegments[1:], parent))
- elif fnmatch.fnmatch(filename, patternsegments[0]):
- cur = os.path.join(parent, filename)
- if len(patternsegments) == 1:
- ret.append(cur)
- else:
- ret.extend(self._match(collection[filename], patternsegments[1:], cur))
- return ret
-
- def glob(self, pattern):
- collection, rest = self.get_collection(pattern)
- patternsegments = rest.split("/")
- return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
-
- def open(self, fn, mode):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.open(rest, mode)
- else:
- return open(self._abs(fn), mode)
-
- def exists(self, fn):
- collection, rest = self.get_collection(fn)
- if collection:
- return collection.exists(rest)
- else:
- return os.path.exists(self._abs(fn))
-
-class ArvadosJob(object):
- """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
-
- def __init__(self, runner):
- self.arvrunner = runner
- self.running = False
-
- def run(self, dry_run=False, pull_image=True, **kwargs):
- script_parameters = {
- "command": self.command_line
- }
- runtime_constraints = {}
-
- if self.generatefiles:
- vwd = arvados.collection.Collection()
- script_parameters["task.vwd"] = {}
- for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- vwd.copy(rest, t, source_collection=src)
- else:
- with vwd.open(t, "w") as f:
- f.write(self.generatefiles[t])
- vwd.save_new()
- for t in self.generatefiles:
- script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
-
- script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
- if self.environment:
- script_parameters["task.env"].update(self.environment)
-
- if self.stdin:
- script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
-
- if self.stdout:
- script_parameters["task.stdout"] = self.stdout
-
- (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
- if docker_req and kwargs.get("use_container") is not False:
- runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
- else:
- runtime_constraints["docker_image"] = "arvados/jobs"
-
- resources = self.builder.resources
- if resources is not None:
- runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
- runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
- runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
-
- filters = [["repository", "=", "arvados"],
- ["script", "=", "crunchrunner"],
- ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
- if not self.arvrunner.ignore_docker_for_reuse:
- filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
-
- try:
- response = self.arvrunner.api.jobs().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "script": "crunchrunner",
- "repository": "arvados",
- "script_version": "master",
- "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
- "script_parameters": {"tasks": [script_parameters]},
- "runtime_constraints": runtime_constraints
- },
- filters=filters,
- find_or_create=kwargs.get("enable_reuse", True)
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.arvrunner.jobs[response["uuid"]] = self
-
- self.update_pipeline_component(response)
-
- logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
- except Exception as e:
- logger.error("Got error %s" % str(e))
- self.output_callback({}, "permanentFail")
-
- def update_pipeline_component(self, record):
- if self.arvrunner.pipeline:
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
- if self.arvrunner.uuid:
- try:
- job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
- if job:
- components = job["components"]
- components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
- body={
- "components": components
- }).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.info("Error adding to components: %s", e)
-
- def done(self, record):
- try:
- self.update_pipeline_component(record)
- except:
- pass
-
- try:
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- try:
- outputs = {}
- if record["output"]:
- logc = arvados.collection.Collection(record["log"])
- log = logc.open(logc.keys()[0])
- tmpdir = None
- outdir = None
- keepdir = None
- for l in log:
- # Determine the tmpdir, outdir and keepdir paths from
- # the job run. Unfortunately, we can't take the first
- # values we find (which are expected to be near the
- # top) and stop scanning because if the node fails and
- # the job restarts on a different node these values
- # will different runs, and we need to know about the
- # final run that actually produced output.
-
- g = tmpdirre.match(l)
- if g:
- tmpdir = g.group(1)
- g = outdirre.match(l)
- if g:
- outdir = g.group(1)
- g = keepre.match(l)
- if g:
- keepdir = g.group(1)
-
- colname = "Output %s of %s" % (record["output"][0:7], self.name)
-
- # check if collection already exists with same owner, name and content
- collection_exists = self.arvrunner.api.collections().list(
- filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
- ['portable_data_hash', '=', record["output"]],
- ["name", "=", colname]]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collection_exists["items"]:
- # Create a collection located in the same project as the
- # pipeline with the contents of the output.
- # First, get output record.
- collections = self.arvrunner.api.collections().list(
- limit=1,
- filters=[['portable_data_hash', '=', record["output"]]],
- select=["manifest_text"]
- ).execute(num_retries=self.arvrunner.num_retries)
-
- if not collections["items"]:
- raise WorkflowException(
- "Job output '%s' cannot be found on API server" % (
- record["output"]))
-
- # Create new collection in the parent project
- # with the output contents.
- self.arvrunner.api.collections().create(body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": colname,
- "portable_data_hash": record["output"],
- "manifest_text": collections["items"][0]["manifest_text"]
- }, ensure_unique_name=True).execute(
- num_retries=self.arvrunner.num_retries)
-
- self.builder.outdir = outdir
- self.builder.pathmapper.keepdir = keepdir
- outputs = self.collect_outputs("keep:" + record["output"])
- except WorkflowException as e:
- logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
- processStatus = "permanentFail"
- except Exception as e:
- logger.exception("Got unknown exception while collecting job outputs:")
- processStatus = "permanentFail"
-
- self.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
-
-
-class RunnerJob(object):
- """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
-
- def __init__(self, runner, tool, job_order, enable_reuse):
- self.arvrunner = runner
- self.tool = tool
- self.job_order = job_order
- self.running = False
- self.enable_reuse = enable_reuse
-
- def update_pipeline_component(self, record):
- pass
-
- def upload_docker(self, tool):
- if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
- if docker_req:
- arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
- elif isinstance(tool, cwltool.workflow.Workflow):
- for s in tool.steps:
- self.upload_docker(s.embedded_tool)
-
- def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
- """Create an Arvados job specification for this workflow.
-
- The returned dict can be used to create a job (i.e., passed as
- the +body+ argument to jobs().create()), or as a component in
- a pipeline template or pipeline instance.
- """
- self.upload_docker(self.tool)
-
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
-
- self.name = os.path.basename(self.tool.tool["id"])
-
- def visitFiles(files, path):
- files.add(path)
- return path
-
- document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
- def loadref(b, u):
- return document_loader.fetch(urlparse.urljoin(b, u))
-
- sc = scandeps(uri, workflowobj,
- set(("$import", "run")),
- set(("$include", "$schemas", "path")),
- loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
-
- workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
- "%s",
- "%s/%s",
- name=self.name,
- **kwargs)
-
- jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
- "%s",
- "%s/%s",
- name=os.path.basename(self.job_order.get("id", "#")),
- **kwargs)
-
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
-
- if "id" in self.job_order:
- del self.job_order["id"]
-
- self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
- return {
- "script": "cwl-runner",
- "script_version": "master",
- "repository": "arvados",
- "script_parameters": self.job_order,
- "runtime_constraints": {
- "docker_image": "arvados/jobs"
- }
- }
-
- def run(self, *args, **kwargs):
- job_spec = self.arvados_job_spec(*args, **kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
-
- response = self.arvrunner.api.jobs().create(
- body=job_spec,
- find_or_create=self.enable_reuse
- ).execute(num_retries=self.arvrunner.num_retries)
-
- self.uuid = response["uuid"]
- self.arvrunner.jobs[self.uuid] = self
-
- logger.info("Submitted job %s", response["uuid"])
-
- if kwargs.get("submit"):
- self.pipeline = self.arvrunner.api.pipeline_instances().create(
- body={
- "owner_uuid": self.arvrunner.project_uuid,
- "name": shortname(self.tool.tool["id"]),
- "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
- "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
-
- if response["state"] in ("Complete", "Failed", "Cancelled"):
- self.done(response)
-
- def done(self, record):
- if record["state"] == "Complete":
- processStatus = "success"
- else:
- processStatus = "permanentFail"
-
- outputs = None
- try:
- try:
- outc = arvados.collection.Collection(record["output"])
- with outc.open("cwl.output.json") as f:
- outputs = json.load(f)
- def keepify(path):
- if not path.startswith("keep:"):
- return "keep:%s/%s" % (record["output"], path)
- adjustFiles(outputs, keepify)
- except Exception as e:
- logger.error("While getting final output object: %s", e)
- self.arvrunner.output_callback(outputs, processStatus)
- finally:
- del self.arvrunner.jobs[record["uuid"]]
-
-
-class RunnerTemplate(object):
- """An Arvados pipeline template that invokes a CWL workflow."""
-
- type_to_dataclass = {
- 'boolean': 'boolean',
- 'File': 'File',
- 'float': 'number',
- 'int': 'number',
- 'string': 'text',
- }
-
- def __init__(self, runner, tool, job_order, enable_reuse):
- self.runner = runner
- self.tool = tool
- self.job = RunnerJob(
- runner=runner,
- tool=tool,
- job_order=job_order,
- enable_reuse=enable_reuse)
-
- def pipeline_component_spec(self):
- """Return a component that Workbench and a-r-p-i will understand.
-
- Specifically, translate CWL input specs to Arvados pipeline
- format, like {"dataclass":"File","value":"xyz"}.
- """
- spec = self.job.arvados_job_spec()
-
- # Most of the component spec is exactly the same as the job
- # spec (script, script_version, etc.).
- # spec['script_parameters'] isn't right, though. A component
- # spec's script_parameters hash is a translation of
- # self.tool.tool['inputs'] with defaults/overrides taken from
- # the job order. So we move the job parameters out of the way
- # and build a new spec['script_parameters'].
- job_params = spec['script_parameters']
- spec['script_parameters'] = {}
-
- for param in self.tool.tool['inputs']:
- param = copy.deepcopy(param)
-
- # Data type and "required" flag...
- types = param['type']
- if not isinstance(types, list):
- types = [types]
- param['required'] = 'null' not in types
- non_null_types = set(types) - set(['null'])
- if len(non_null_types) == 1:
- the_type = [c for c in non_null_types][0]
- dataclass = self.type_to_dataclass.get(the_type)
- if dataclass:
- param['dataclass'] = dataclass
- # Note: If we didn't figure out a single appropriate
- # dataclass, we just left that attribute out. We leave
- # the "type" attribute there in any case, which might help
- # downstream.
-
- # Title and description...
- title = param.pop('label', '')
- descr = param.pop('description', '').rstrip('\n')
- if title:
- param['title'] = title
- if descr:
- param['description'] = descr
-
- # Fill in the value from the current job order, if any.
- param_id = shortname(param.pop('id'))
- value = job_params.get(param_id)
- if value is None:
- pass
- elif not isinstance(value, dict):
- param['value'] = value
- elif param.get('dataclass') == 'File' and value.get('path'):
- param['value'] = value['path']
-
- spec['script_parameters'][param_id] = param
- spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
- return spec
-
- def save(self):
- job_spec = self.pipeline_component_spec()
- response = self.runner.api.pipeline_templates().create(body={
- "components": {
- self.job.name: job_spec,
- },
- "name": self.job.name,
- "owner_uuid": self.runner.project_uuid,
- }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
- self.uuid = response["uuid"]
- logger.info("Created template %s", self.uuid)
-
-
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
- """Convert container-local paths to and from Keep collection ids."""
-
- def __init__(self, arvrunner, referenced_files, input_basedir,
- collection_pattern, file_pattern, name=None, **kwargs):
- self._pathmap = arvrunner.get_uploaded()
- uploadfiles = set()
-
- pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
-
- for src in referenced_files:
- if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, collection_pattern % src[5:])
- if "#" in src:
- src = src[:src.index("#")]
- if src not in self._pathmap:
- ab = cwltool.pathmapper.abspath(src, input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
- if kwargs.get("conformance_test"):
- self._pathmap[src] = (src, ab)
- elif isinstance(st, arvados.commands.run.UploadFile):
- uploadfiles.add((src, ab, st))
- elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
- else:
- raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
-
- if uploadfiles:
- arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
- arvrunner.api,
- dry_run=kwargs.get("dry_run"),
- num_retries=3,
- fnPattern=file_pattern,
- name=name,
- project=arvrunner.project_uuid)
-
- for src, ab, st in uploadfiles:
- arvrunner.add_uploaded(src, (ab, st.fn))
- self._pathmap[src] = (ab, st.fn)
-
- self.keepdir = None
-
- def reversemap(self, target):
- if target.startswith("keep:"):
- return (target, target)
- elif self.keepdir and target.startswith(self.keepdir):
- return (target, "keep:" + target[len(self.keepdir)+1:])
- else:
- return super(ArvPathMapper, self).reversemap(target)
-
-
-class ArvadosCommandTool(CommandLineTool):
- """Wrap cwltool CommandLineTool to override selected methods."""
-
- def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
- super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
- self.arvrunner = arvrunner
- self.crunch2 = crunch2
-
- def makeJobRunner(self):
- if self.crunch2:
- return ArvadosContainer(self.arvrunner)
- else:
- return ArvadosJob(self.arvrunner)
-
- def makePathMapper(self, reffiles, **kwargs):
- return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "$(task.keep)/%s",
- "$(task.keep)/%s/%s",
- **kwargs)
-
+arvados.log_handler.setFormatter(logging.Formatter(
+ '%(asctime)s %(name)s %(levelname)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
class ArvCwlRunner(object):
- """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
- complete, and report output."""
+ """Execute a CWL tool or workflow, submit work (using either jobs or
+ containers API), wait for them to complete, and report output.
+
+ """
- def __init__(self, api_client, crunch2):
+ def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
self.api = api_client
- self.jobs = {}
+ self.processes = {}
self.lock = threading.Lock()
self.cond = threading.Condition(self.lock)
self.final_output = None
+ self.final_status = None
self.uploaded = {}
- self.num_retries = 4
+ self.num_retries = num_retries
self.uuid = None
- self.crunch2 = crunch2
+ self.stop_polling = threading.Event()
+ self.poll_api = None
+ self.pipeline = None
+ self.final_output_collection = None
+ self.output_name = output_name
+ self.output_tags = output_tags
+ self.project_uuid = None
- def arvMakeTool(self, toolpath_object, **kwargs):
+ if keep_client is not None:
+ self.keep_client = keep_client
+ else:
+ self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
+
+ self.work_api = None
+ expected_api = ["jobs", "containers"]
+ for api in expected_api:
+ try:
+ methods = self.api._rootDesc.get('resources')[api]['methods']
+ if ('httpMethod' in methods['create'] and
+ (work_api == api or work_api is None)):
+ self.work_api = api
+ break
+ except KeyError:
+ pass
+
+ if not self.work_api:
+ if work_api is None:
+ raise Exception("No supported APIs")
+ else:
+ raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
+
+ def arv_make_tool(self, toolpath_object, **kwargs):
+ kwargs["work_api"] = self.work_api
+ kwargs["fetcher_constructor"] = partial(CollectionFetcher,
+ api_client=self.api,
+ keep_client=self.keep_client)
if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
- return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
+ return ArvadosCommandTool(self, toolpath_object, **kwargs)
+ elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
+ return ArvadosWorkflow(self, toolpath_object, **kwargs)
else:
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
def output_callback(self, out, processStatus):
if processStatus == "success":
- logger.info("Overall job status is %s", processStatus)
+ logger.info("Overall process status is %s", processStatus)
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Complete"}).execute(num_retries=self.num_retries)
-
else:
- logger.warn("Overall job status is %s", processStatus)
+ logger.warn("Overall process status is %s", processStatus)
if self.pipeline:
self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
self.final_output = out
def on_message(self, event):
if "object_uuid" in event:
- if event["object_uuid"] in self.jobs and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+ if event["object_uuid"] in self.processes and event["event_type"] == "update":
+ if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
uuid = event["object_uuid"]
with self.lock:
- j = self.jobs[uuid]
- logger.info("Job %s (%s) is Running", j.name, uuid)
+ j = self.processes[uuid]
+ logger.info("%s %s is Running", self.label(j), uuid)
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
uuid = event["object_uuid"]
try:
self.cond.acquire()
- j = self.jobs[uuid]
- logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
- j.done(event["properties"]["new_attributes"])
+ j = self.processes[uuid]
+ logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
+ with Perf(metrics, "done %s" % j.name):
+ j.done(event["properties"]["new_attributes"])
self.cond.notify()
finally:
self.cond.release()
+ def label(self, obj):
+ return "[%s %s]" % (self.work_api[0:-1], obj.name)
+
+ def poll_states(self):
+ """Poll status of jobs or containers listed in the processes dict.
+
+ Runs in a separate thread.
+ """
+
+ try:
+ while True:
+ self.stop_polling.wait(15)
+ if self.stop_polling.is_set():
+ break
+ with self.lock:
+ keys = self.processes.keys()
+ if not keys:
+ continue
+
+ if self.work_api == "containers":
+ table = self.poll_api.container_requests()
+ elif self.work_api == "jobs":
+ table = self.poll_api.jobs()
+
+ try:
+ proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.warn("Error checking states on API server: %s", e)
+ continue
+
+ for p in proc_states["items"]:
+ self.on_message({
+ "object_uuid": p["uuid"],
+ "event_type": "update",
+ "properties": {
+ "new_attributes": p
+ }
+ })
+ except:
+ logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+ self.cond.acquire()
+ self.processes.clear()
+ self.cond.notify()
+ self.cond.release()
+ finally:
+ self.stop_polling.set()
+
def get_uploaded(self):
return self.uploaded.copy()
def add_uploaded(self, src, pair):
self.uploaded[src] = pair
- def arvExecutor(self, tool, job_order, **kwargs):
- self.debug = kwargs.get("debug")
+ def check_features(self, obj):
+ if isinstance(obj, dict):
+ if obj.get("class") == "InitialWorkDirRequirement":
+ if self.work_api == "containers":
+ raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
+ if obj.get("writable"):
+ raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+ if obj.get("class") == "CommandLineTool":
+ if self.work_api == "containers":
+ if obj.get("stdin"):
+ raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
+ if obj.get("stderr"):
+ raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
+ if obj.get("class") == "DockerRequirement":
+ if obj.get("dockerOutputDirectory"):
+ # TODO: can be supported by containers API, but not jobs API.
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+ for v in obj.itervalues():
+ self.check_features(v)
+ elif isinstance(obj, list):
+ for i,v in enumerate(obj):
+ with SourceLine(obj, i, UnsupportedRequirement):
+ self.check_features(v)
+
+ def make_output_collection(self, name, tagsString, outputObj):
+ outputObj = copy.deepcopy(outputObj)
+
+ files = []
+ def capture(fileobj):
+ files.append(fileobj)
+
+ adjustDirObjs(outputObj, capture)
+ adjustFileObjs(outputObj, capture)
+
+ generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
+
+ final = arvados.collection.Collection(api_client=self.api,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+
+ srccollections = {}
+ for k,v in generatemapper.items():
+ if k.startswith("_:"):
+ if v.type == "Directory":
+ continue
+ if v.type == "CreateFile":
+ with final.open(v.target, "wb") as f:
+ f.write(v.resolved.encode("utf-8"))
+ continue
+
+ if not k.startswith("keep:"):
+ raise Exception("Output source is not in keep or a literal")
+ sp = k.split("/")
+ srccollection = sp[0][5:]
+ if srccollection not in srccollections:
+ try:
+ srccollections[srccollection] = arvados.collection.CollectionReader(
+ srccollection,
+ api_client=self.api,
+ keep_client=self.keep_client,
+ num_retries=self.num_retries)
+ except arvados.errors.ArgumentError as e:
+ logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+ raise
+ reader = srccollections[srccollection]
+ try:
+ srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
+ final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
+ except IOError as e:
+ logger.warn("While preparing output collection: %s", e)
- if kwargs.get("quiet"):
- logger.setLevel(logging.WARN)
- logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+ def rewrite(fileobj):
+ fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
+ for k in ("basename", "listing", "contents"):
+ if k in fileobj:
+ del fileobj[k]
- useruuid = self.api.users().current().execute()["uuid"]
- self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
- self.pipeline = None
+ adjustDirObjs(outputObj, rewrite)
+ adjustFileObjs(outputObj, rewrite)
- if kwargs.get("create_template"):
- tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
- tmpl.save()
- # cwltool.main will write our return value to stdout.
- return tmpl.uuid
+ with final.open("cwl.output.json", "w") as f:
+ json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
- if kwargs.get("submit"):
- runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
+ final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
- if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
- # Create pipeline for local run
- self.pipeline = self.api.pipeline_instances().create(
- body={
- "owner_uuid": self.project_uuid,
- "name": shortname(tool.tool["id"]),
- "components": {},
- "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
- logger.info("Pipeline instance %s", self.pipeline["uuid"])
+ logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
+ final.api_response()["name"],
+ final.manifest_locator())
+
+ final_uuid = final.manifest_locator()
+ tags = tagsString.split(',')
+ for tag in tags:
+ self.api.links().create(body={
+ "head_uuid": final_uuid, "link_class": "tag", "name": tag
+ }).execute(num_retries=self.num_retries)
- if kwargs.get("submit") and not kwargs.get("wait"):
- runnerjob.run()
- return runnerjob.uuid
+ def finalcollection(fileobj):
+ fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
- events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+ adjustDirObjs(outputObj, finalcollection)
+ adjustFileObjs(outputObj, finalcollection)
+ return (outputObj, final)
+
+ def set_crunch_output(self):
+ if self.work_api == "containers":
+ try:
+ current = self.api.containers().current().execute(num_retries=self.num_retries)
+ except ApiError as e:
+ # Status code 404 just means we're not running in a container.
+ if e.resp.status != 404:
+ logger.info("Getting current container: %s", e)
+ return
+ try:
+ self.api.containers().update(uuid=current['uuid'],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ }).execute(num_retries=self.num_retries)
+ except Exception as e:
+ logger.info("Setting container output: %s", e)
+ elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
+ self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
+ body={
+ 'output': self.final_output_collection.portable_data_hash(),
+ 'success': self.final_status == "success",
+ 'progress':1.0
+ }).execute(num_retries=self.num_retries)
+
+ def arv_executor(self, tool, job_order, **kwargs):
self.debug = kwargs.get("debug")
+
+ tool.visit(self.check_features)
+
+ self.project_uuid = kwargs.get("project_uuid")
+ self.pipeline = None
+ make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
+ api_client=self.api,
+ keep_client=self.keep_client)
+ self.fs_access = make_fs_access(kwargs["basedir"])
+
+ if not kwargs.get("name"):
+ kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+
+ # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
+ # Also uploads docker images.
+ upload_workflow_deps(self, tool)
+
+ # Reload tool object which may have been updated by
+ # upload_workflow_deps
+ tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
+ makeTool=self.arv_make_tool,
+ loader=tool.doc_loader,
+ avsc_names=tool.doc_schema,
+ metadata=tool.metadata)
+
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % kwargs["name"],
+ tool, job_order)
+
+ existing_uuid = kwargs.get("update_workflow")
+ if existing_uuid or kwargs.get("create_workflow"):
+ # Create a pipeline template or workflow record and exit.
+ if self.work_api == "jobs":
+ tmpl = RunnerTemplate(self, tool, job_order,
+ kwargs.get("enable_reuse"),
+ uuid=existing_uuid,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"])
+ tmpl.save()
+ # cwltool.main will write our return value to stdout.
+ return (tmpl.uuid, "success")
+ elif self.work_api == "containers":
+ return (upload_workflow(self, tool, job_order,
+ self.project_uuid,
+ uuid=existing_uuid,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs["name"]),
+ "success")
+
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
- self.fs_access = CollectionFsAccess(kwargs["basedir"])
- kwargs["fs_access"] = self.fs_access
+ kwargs["make_fs_access"] = make_fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+ kwargs["use_container"] = True
+ kwargs["tmpdir_prefix"] = "tmp"
+ kwargs["compute_checksum"] = kwargs.get("compute_checksum")
- if self.crunch2:
+ if self.work_api == "containers":
kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
- else:
+ kwargs["docker_tmpdir"] = "/tmp"
+ elif self.work_api == "jobs":
kwargs["outdir"] = "$(task.outdir)"
+ kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
- if kwargs.get("conformance_test"):
- return cwltool.main.single_job_executor(tool, job_order, **kwargs)
+ runnerjob = None
+ if kwargs.get("submit"):
+ # Submit a runner job to run the workflow for us.
+ if self.work_api == "containers":
+ if tool.tool["class"] == "CommandLineTool":
+ kwargs["runnerjob"] = tool.tool["id"]
+ upload_dependencies(self,
+ kwargs["name"],
+ tool.doc_loader,
+ tool.tool,
+ tool.tool["id"],
+ False)
+ runnerjob = tool.job(job_order,
+ self.output_callback,
+ **kwargs).next()
+ else:
+ runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs.get("name"),
+ on_error=kwargs.get("on_error"),
+ submit_runner_image=kwargs.get("submit_runner_image"))
+ elif self.work_api == "jobs":
+ runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
+ self.output_name,
+ self.output_tags,
+ submit_runner_ram=kwargs.get("submit_runner_ram"),
+ name=kwargs.get("name"),
+ on_error=kwargs.get("on_error"),
+ submit_runner_image=kwargs.get("submit_runner_image"))
+
+ if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
+ # Create pipeline for local run
+ self.pipeline = self.api.pipeline_instances().create(
+ body={
+ "owner_uuid": self.project_uuid,
+ "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
+ "components": {},
+ "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+ logger.info("Pipeline instance %s", self.pipeline["uuid"])
+
+ if runnerjob and not kwargs.get("wait"):
+ runnerjob.run(wait=kwargs.get("wait"))
+ return (runnerjob.uuid, "success")
+
+ self.poll_api = arvados.api('v1')
+ self.polling_thread = threading.Thread(target=self.poll_states)
+ self.polling_thread.start()
+
+ if runnerjob:
+ jobiter = iter((runnerjob,))
else:
- if kwargs.get("submit"):
- jobiter = iter((runnerjob,))
- else:
- if "cwl_runner_job" in kwargs:
- self.uuid = kwargs.get("cwl_runner_job").get('uuid')
- jobiter = tool.job(job_order,
- self.output_callback,
- docker_outdir="$(task.outdir)",
- **kwargs)
+ if "cwl_runner_job" in kwargs:
+ self.uuid = kwargs.get("cwl_runner_job").get('uuid')
+ jobiter = tool.job(job_order,
+ self.output_callback,
+ **kwargs)
- try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
+ try:
+ self.cond.acquire()
+ # Will continue to hold the lock for the duration of this code
+ # except when in cond.wait(), at which point on_message can update
+ # job state and process output callbacks.
+
+ loopperf = Perf(metrics, "jobiter")
+ loopperf.__enter__()
+ for runnable in jobiter:
+ loopperf.__exit__()
+
+ if self.stop_polling.is_set():
+ break
- for runnable in jobiter:
- if runnable:
+ if runnable:
+ with Perf(metrics, "run"):
runnable.run(**kwargs)
- else:
- if self.jobs:
- self.cond.wait(1)
- else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
- break
-
- while self.jobs:
- self.cond.wait(1)
-
- events.close()
- except:
- if sys.exc_info()[0] is KeyboardInterrupt:
- logger.error("Interrupted, marking pipeline as failed")
else:
- logger.error("Caught unhandled exception, marking pipeline as failed. Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- finally:
- self.cond.release()
+ if self.processes:
+ self.cond.wait(1)
+ else:
+ logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ break
+ loopperf.__enter__()
+ loopperf.__exit__()
+
+ while self.processes:
+ self.cond.wait(1)
+
+ except UnsupportedRequirement:
+ raise
+ except:
+ if sys.exc_info()[0] is KeyboardInterrupt:
+ logger.error("Interrupted, marking pipeline as failed")
+ else:
+ logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ if runnerjob and runnerjob.uuid and self.work_api == "containers":
+ self.api.container_requests().update(uuid=runnerjob.uuid,
+ body={"priority": "0"}).execute(num_retries=self.num_retries)
+ finally:
+ self.cond.release()
+ self.stop_polling.set()
+ self.polling_thread.join()
- if self.final_output is None:
- raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+ if self.final_status == "UnsupportedRequirement":
+ raise UnsupportedRequirement("Check log for details.")
+
+ if self.final_output is None:
+ raise WorkflowException("Workflow did not return a result.")
+
+ if kwargs.get("submit") and isinstance(runnerjob, Runner):
+ logger.info("Final output collection %s", runnerjob.final_output)
+ else:
+ if self.output_name is None:
+ self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
+ if self.output_tags is None:
+ self.output_tags = ""
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
+ self.set_crunch_output()
+
+ if kwargs.get("compute_checksum"):
+ adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
+ adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
+
+ return (self.final_output, self.final_status)
- return self.final_output
def versionstring():
"""Print version string of key packages for provenance and debugging."""
arvpkg = pkg_resources.require("arvados-python-client")
cwlpkg = pkg_resources.require("cwltool")
- return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+ return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
"arvados-python-client", arvpkg[0].version,
"cwltool", cwlpkg[0].version)
+
def arg_parser(): # type: () -> argparse.ArgumentParser
parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
- parser.add_argument("--conformance-test", action="store_true")
parser.add_argument("--basedir", type=str,
help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
+ parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
+
parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
exgroup = parser.add_mutually_exclusive_group()
default=True, dest="enable_reuse",
help="")
- parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
+ parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
+ parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
+ parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
parser.add_argument("--ignore-docker-for-reuse", action="store_true",
help="Ignore Docker image version when deciding whether to reuse past jobs.",
default=False)
default=True, dest="submit")
exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
default=True, dest="submit")
- exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+ exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
+ dest="create_workflow")
+ exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
+ exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
default=True, dest="wait")
exgroup = parser.add_mutually_exclusive_group()
- exgroup.add_argument("--crunch1", action="store_false",
- default=False, dest="crunch2",
- help="Use Crunch v1 Jobs API")
+ exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
+ default=True, dest="log_timestamps")
+ exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
+ default=True, dest="log_timestamps")
+
+ parser.add_argument("--api", type=str,
+ default=None, dest="work_api",
+ help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
+
+ parser.add_argument("--compute-checksum", action="store_true", default=False,
+ help="Compute checksum of contents while collecting outputs",
+ dest="compute_checksum")
+
+ parser.add_argument("--submit-runner-ram", type=int,
+ help="RAM (in MiB) required for the workflow runner job (default 1024)",
+ default=1024)
+
+ parser.add_argument("--submit-runner-image", type=str,
+ help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
+ default=None)
- exgroup.add_argument("--crunch2", action="store_true",
- default=False, dest="crunch2",
- help="Use Crunch v2 Containers API")
+ parser.add_argument("--name", type=str,
+ help="Name to use for workflow execution instance.",
+ default=None)
- parser.add_argument("workflow", type=str, nargs="?", default=None)
- parser.add_argument("job_order", nargs=argparse.REMAINDER)
+ parser.add_argument("--on-error", type=str,
+ help="Desired workflow behavior when a step fails. One of 'stop' or 'continue'. "
+ "Default is 'continue'.", default="continue", choices=("stop", "continue"))
+
+ parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+ parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
return parser
-def main(args, stdout, stderr, api_client=None):
+def add_arv_hints():
+ cache = {}
+ res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
+ cache["http://arvados.org/cwl"] = res.read()
+ res.close()
+ document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
+ _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
+ for n in extnames.names:
+ if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
+ cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
+ document_loader.idx["http://arvados.org/cwl#"+n] = {}
+
+def main(args, stdout, stderr, api_client=None, keep_client=None):
parser = arg_parser()
job_order_object = None
arvargs = parser.parse_args(args)
- if arvargs.create_template and not arvargs.job_order:
+
+ if arvargs.version:
+ print versionstring()
+ return
+
+ if arvargs.update_workflow:
+ if arvargs.update_workflow.find('-7fd4e-') == 5:
+ want_api = 'containers'
+ elif arvargs.update_workflow.find('-p5p6p-') == 5:
+ want_api = 'jobs'
+ else:
+ want_api = None
+ if want_api and arvargs.work_api and want_api != arvargs.work_api:
+ logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+ arvargs.update_workflow, want_api, arvargs.work_api))
+ return 1
+ arvargs.work_api = want_api
+
+ if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
job_order_object = ({}, "")
+ add_arv_hints()
+
try:
if api_client is None:
api_client=arvados.api('v1', model=OrderedJsonModel())
- runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
+ if keep_client is None:
+ keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
+ runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
+ num_retries=4, output_name=arvargs.output_name,
+ output_tags=arvargs.output_tags)
except Exception as e:
logger.error(e)
return 1
+ if arvargs.debug:
+ logger.setLevel(logging.DEBUG)
+ logging.getLogger('arvados').setLevel(logging.DEBUG)
+
+ if arvargs.quiet:
+ logger.setLevel(logging.WARN)
+ logging.getLogger('arvados').setLevel(logging.WARN)
+ logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
+ if arvargs.metrics:
+ metrics.setLevel(logging.DEBUG)
+ logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
+
+ if arvargs.log_timestamps:
+ arvados.log_handler.setFormatter(logging.Formatter(
+ '%(asctime)s %(name)s %(levelname)s: %(message)s',
+ '%Y-%m-%d %H:%M:%S'))
+ else:
+ arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
+
+ arvargs.conformance_test = None
+ arvargs.use_container = True
+ arvargs.relax_path_checks = True
+ arvargs.validate = None
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
- executor=runner.arvExecutor,
- makeTool=runner.arvMakeTool,
+ executor=runner.arv_executor,
+ makeTool=runner.arv_make_tool,
versionfunc=versionstring,
- job_order_object=job_order_object)
+ job_order_object=job_order_object,
+ make_fs_access=partial(CollectionFsAccess,
+ api_client=api_client,
+ keep_client=keep_client),
+ fetcher_constructor=partial(CollectionFetcher,
+ api_client=api_client,
+ keep_client=keep_client,
+ num_retries=runner.num_retries),
+ resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
+ logger_handler=arvados.log_handler)