3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
16 import arvados.collection
17 import ruamel.yaml as yaml
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21 from ._version import __version__
23 logger = logging.getLogger('arvados.cwl-runner')
25 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
27 def trim_listing(obj):
28 """Remove 'listing' field from Directory objects that are keep references.
30 When Directory objects represent Keep references, it redundant and
31 potentially very expensive to pass fully enumerated Directory objects
32 between instances of cwl-runner (e.g. a submitting a job, or using the
33 RunInSingleContainer feature), so delete the 'listing' field when it is
37 if obj.get("location", "").startswith("keep:") and "listing" in obj:
39 if obj.get("location", "").startswith("_:"):
42 def upload_dependencies(arvrunner, name, document_loader,
43 workflowobj, uri, loadref_run):
44 """Upload the dependencies of the workflowobj document to Keep.
46 Returns a pathmapper object mapping local paths to keep references. Also
47 does an in-place update of references in "workflowobj".
49 Use scandeps to find $import, $include, $schemas, run, File and Directory
50 fields that represent external references.
52 If workflowobj has an "id" field, this will reload the document to ensure
53 it is scanning the raw document prior to preprocessing.
58 joined = urlparse.urljoin(b, u)
59 defrg, _ = urlparse.urldefrag(joined)
60 if defrg not in loaded:
62 # Use fetch_text to get raw file (before preprocessing).
63 text = document_loader.fetch_text(defrg)
64 if isinstance(text, bytes):
65 textIO = StringIO(text.decode('utf-8'))
67 textIO = StringIO(text)
68 return yaml.safe_load(textIO)
73 loadref_fields = set(("$import", "run"))
75 loadref_fields = set(("$import",))
78 if "id" in workflowobj:
79 # Need raw file content (before preprocessing) to ensure
80 # that external references in $include and $mixin are captured.
81 scanobj = loadref("", workflowobj["id"])
83 sc = scandeps(uri, scanobj,
85 set(("$include", "$schemas", "location")),
88 normalizeFilesDirs(sc)
90 if "id" in workflowobj:
91 sc.append({"class": "File", "location": workflowobj["id"]})
93 mapper = ArvPathMapper(arvrunner, sc, "",
99 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
100 p["location"] = mapper.mapper(p["location"]).resolved
101 adjustFileObjs(workflowobj, setloc)
102 adjustDirObjs(workflowobj, setloc)
107 def upload_docker(arvrunner, tool):
108 if isinstance(tool, CommandLineTool):
109 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
111 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
112 elif isinstance(tool, cwltool.workflow.Workflow):
114 upload_docker(arvrunner, s.embedded_tool)
116 def upload_instance(arvrunner, name, tool, job_order):
117 upload_docker(arvrunner, tool)
119 workflowmapper = upload_dependencies(arvrunner,
126 jobmapper = upload_dependencies(arvrunner,
127 os.path.basename(job_order.get("id", "#")),
130 job_order.get("id", "#"),
133 if "id" in job_order:
136 return workflowmapper
138 def arvados_jobs_image(arvrunner):
139 img = "arvados/jobs:"+__version__
141 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
142 except Exception as e:
143 raise Exception("Docker image %s is not available\n%s" % (img, e) )
146 class Runner(object):
147 def __init__(self, runner, tool, job_order, enable_reuse, output_name):
148 self.arvrunner = runner
150 self.job_order = job_order
152 self.enable_reuse = enable_reuse
154 self.final_output = None
155 self.output_name = output_name
157 def update_pipeline_component(self, record):
160 def arvados_job_spec(self, *args, **kwargs):
161 self.name = os.path.basename(self.tool.tool["id"])
162 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
163 adjustDirObjs(self.job_order, trim_listing)
164 return workflowmapper
166 def done(self, record):
167 if record["state"] == "Complete":
168 if record.get("exit_code") is not None:
169 if record["exit_code"] == 33:
170 processStatus = "UnsupportedRequirement"
171 elif record["exit_code"] == 0:
172 processStatus = "success"
174 processStatus = "permanentFail"
176 processStatus = "success"
178 processStatus = "permanentFail"
183 self.final_output = record["output"]
184 outc = arvados.collection.CollectionReader(self.final_output,
185 api_client=self.arvrunner.api,
186 keep_client=self.arvrunner.keep_client,
187 num_retries=self.arvrunner.num_retries)
188 with outc.open("cwl.output.json") as f:
189 outputs = json.load(f)
190 def keepify(fileobj):
191 path = fileobj["location"]
192 if not path.startswith("keep:"):
193 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
194 adjustFileObjs(outputs, keepify)
195 adjustDirObjs(outputs, keepify)
196 except Exception as e:
197 logger.error("While getting final output object: %s", e)
198 self.arvrunner.output_callback(outputs, processStatus)
200 del self.arvrunner.processes[record["uuid"]]