3 from functools import partial
8 from StringIO import StringIO
10 from schema_salad.sourceline import SourceLine
12 import cwltool.draft2tool
13 from cwltool.draft2tool import CommandLineTool
14 import cwltool.workflow
15 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
16 from cwltool.load_tool import fetch_document
17 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
18 from cwltool.utils import aslist
19 from cwltool.builder import substitute
20 from cwltool.pack import pack
22 import arvados.collection
23 import ruamel.yaml as yaml
25 from .arvdocker import arv_docker_get_image
26 from .pathmapper import ArvPathMapper
27 from ._version import __version__
30 logger = logging.getLogger('arvados.cwl-runner')
32 def trim_listing(obj):
33 """Remove 'listing' field from Directory objects that are keep references.
35 When Directory objects represent Keep references, it redundant and
36 potentially very expensive to pass fully enumerated Directory objects
37 between instances of cwl-runner (e.g. a submitting a job, or using the
38 RunInSingleContainer feature), so delete the 'listing' field when it is
42 if obj.get("location", "").startswith("keep:") and "listing" in obj:
44 if obj.get("location", "").startswith("_:"):
47 def upload_dependencies(arvrunner, name, document_loader,
48 workflowobj, uri, loadref_run, include_primary=True):
49 """Upload the dependencies of the workflowobj document to Keep.
51 Returns a pathmapper object mapping local paths to keep references. Also
52 does an in-place update of references in "workflowobj".
54 Use scandeps to find $import, $include, $schemas, run, File and Directory
55 fields that represent external references.
57 If workflowobj has an "id" field, this will reload the document to ensure
58 it is scanning the raw document prior to preprocessing.
63 joined = document_loader.fetcher.urljoin(b, u)
64 defrg, _ = urlparse.urldefrag(joined)
65 if defrg not in loaded:
67 # Use fetch_text to get raw file (before preprocessing).
68 text = document_loader.fetch_text(defrg)
69 if isinstance(text, bytes):
70 textIO = StringIO(text.decode('utf-8'))
72 textIO = StringIO(text)
73 return yaml.safe_load(textIO)
78 loadref_fields = set(("$import", "run"))
80 loadref_fields = set(("$import",))
83 if "id" in workflowobj:
84 # Need raw file content (before preprocessing) to ensure
85 # that external references in $include and $mixin are captured.
86 scanobj = loadref("", workflowobj["id"])
88 sc = scandeps(uri, scanobj,
90 set(("$include", "$schemas", "location")),
91 loadref, urljoin=document_loader.fetcher.urljoin)
93 normalizeFilesDirs(sc)
95 if include_primary and "id" in workflowobj:
96 sc.append({"class": "File", "location": workflowobj["id"]})
98 if "$schemas" in workflowobj:
99 for s in workflowobj["$schemas"]:
100 sc.append({"class": "File", "location": s})
102 mapper = ArvPathMapper(arvrunner, sc, "",
108 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
109 p["location"] = mapper.mapper(p["location"]).resolved
110 adjustFileObjs(workflowobj, setloc)
111 adjustDirObjs(workflowobj, setloc)
113 if "$schemas" in workflowobj:
115 for s in workflowobj["$schemas"]:
116 sch.append(mapper.mapper(s).resolved)
117 workflowobj["$schemas"] = sch
122 def upload_docker(arvrunner, tool):
123 """Uploads Docker images used in CommandLineTool objects."""
125 if isinstance(tool, CommandLineTool):
126 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
128 if docker_req.get("dockerOutputDirectory"):
129 # TODO: can be supported by containers API, but not jobs API.
130 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
131 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
132 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
133 elif isinstance(tool, cwltool.workflow.Workflow):
135 upload_docker(arvrunner, s.embedded_tool)
137 def packed_workflow(arvrunner, tool):
138 """Create a packed workflow.
140 A "packed" workflow is one where all the components have been combined into a single document."""
142 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
143 tool.tool["id"], tool.metadata)
145 def tag_git_version(packed):
146 if tool.tool["id"].startswith("file://"):
147 path = os.path.dirname(tool.tool["id"][7:])
149 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
150 except (OSError, subprocess.CalledProcessError):
153 packed["http://schema.org/version"] = githash
156 def upload_job_order(arvrunner, name, tool, job_order):
157 """Upload local files referenced in the input object and return updated input
158 object with 'location' updated to the proper keep references.
161 for t in tool.tool["inputs"]:
162 def setSecondary(fileobj):
163 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
164 if "secondaryFiles" not in fileobj:
165 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
167 if isinstance(fileobj, list):
171 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
172 setSecondary(job_order[shortname(t["id"])])
174 jobmapper = upload_dependencies(arvrunner,
178 job_order.get("id", "#"),
181 if "id" in job_order:
184 # Need to filter this out, gets added by cwltool when providing
185 # parameters on the command line.
186 if "job_order" in job_order:
187 del job_order["job_order"]
191 def upload_workflow_deps(arvrunner, tool):
192 # Ensure that Docker images needed by this workflow are available
194 upload_docker(arvrunner, tool)
196 document_loader = tool.doc_loader
198 def upload_tool_deps(deptool):
200 upload_dependencies(arvrunner,
201 "%s dependencies" % (shortname(deptool["id"])),
206 include_primary=False)
207 document_loader.idx[deptool["id"]] = deptool
209 tool.visit(upload_tool_deps)
211 def arvados_jobs_image(arvrunner, img):
212 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
215 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
216 except Exception as e:
217 raise Exception("Docker image %s is not available\n%s" % (img, e) )
220 def upload_workflow_collection(arvrunner, name, packed):
221 collection = arvados.collection.Collection(api_client=arvrunner.api,
222 keep_client=arvrunner.keep_client,
223 num_retries=arvrunner.num_retries)
224 with collection.open("workflow.cwl", "w") as f:
225 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
227 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
228 ["name", "like", name+"%"]]
229 if arvrunner.project_uuid:
230 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
231 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
234 logger.info("Using collection %s", exists["items"][0]["uuid"])
236 collection.save_new(name=name,
237 owner_uuid=arvrunner.project_uuid,
238 ensure_unique_name=True,
239 num_retries=arvrunner.num_retries)
240 logger.info("Uploaded to %s", collection.manifest_locator())
242 return collection.portable_data_hash()
245 class Runner(object):
246 """Base class for runner processes, which submit an instance of
247 arvados-cwl-runner and wait for the final result."""
249 def __init__(self, runner, tool, job_order, enable_reuse,
250 output_name, output_tags, submit_runner_ram=0,
251 name=None, on_error=None, submit_runner_image=None):
252 self.arvrunner = runner
254 self.job_order = job_order
256 self.enable_reuse = enable_reuse
258 self.final_output = None
259 self.output_name = output_name
260 self.output_tags = output_tags
262 self.on_error = on_error
263 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
265 if submit_runner_ram:
266 self.submit_runner_ram = submit_runner_ram
268 self.submit_runner_ram = 1024
270 if self.submit_runner_ram <= 0:
271 raise Exception("Value of --submit-runner-ram must be greater than zero")
273 def update_pipeline_component(self, record):
276 def done(self, record):
277 """Base method for handling a completed runner."""
280 if record["state"] == "Complete":
281 if record.get("exit_code") is not None:
282 if record["exit_code"] == 33:
283 processStatus = "UnsupportedRequirement"
284 elif record["exit_code"] == 0:
285 processStatus = "success"
287 processStatus = "permanentFail"
289 processStatus = "success"
291 processStatus = "permanentFail"
295 if processStatus == "permanentFail":
296 logc = arvados.collection.CollectionReader(record["log"],
297 api_client=self.arvrunner.api,
298 keep_client=self.arvrunner.keep_client,
299 num_retries=self.arvrunner.num_retries)
300 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
302 self.final_output = record["output"]
303 outc = arvados.collection.CollectionReader(self.final_output,
304 api_client=self.arvrunner.api,
305 keep_client=self.arvrunner.keep_client,
306 num_retries=self.arvrunner.num_retries)
307 if "cwl.output.json" in outc:
308 with outc.open("cwl.output.json") as f:
310 outputs = json.load(f)
311 def keepify(fileobj):
312 path = fileobj["location"]
313 if not path.startswith("keep:"):
314 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
315 adjustFileObjs(outputs, keepify)
316 adjustDirObjs(outputs, keepify)
317 except Exception as e:
318 logger.exception("[%s] While getting final output object: %s", self.name, e)
319 self.arvrunner.output_callback({}, "permanentFail")
321 self.arvrunner.output_callback(outputs, processStatus)
323 if record["uuid"] in self.arvrunner.processes:
324 del self.arvrunner.processes[record["uuid"]]