3 from functools import partial
8 from StringIO import StringIO
10 from schema_salad.sourceline import SourceLine
12 import cwltool.draft2tool
13 from cwltool.draft2tool import CommandLineTool
14 import cwltool.workflow
15 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
16 from cwltool.load_tool import fetch_document
17 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
18 from cwltool.utils import aslist
19 from cwltool.builder import substitute
20 from cwltool.pack import pack
22 import arvados.collection
23 import ruamel.yaml as yaml
25 from .arvdocker import arv_docker_get_image
26 from .pathmapper import ArvPathMapper, trim_listing
27 from ._version import __version__
30 logger = logging.getLogger('arvados.cwl-runner')
32 def trim_anonymous_location(obj):
33 """Remove 'location' field from File and Directory literals.
35 To make internal handling easier, literals are assigned a random id for
36 'location'. However, when writing the record back out, this can break
37 reproducibility. Since it is valid for literals not have a 'location'
42 if obj.get("location", "").startswith("_:"):
45 def upload_dependencies(arvrunner, name, document_loader,
46 workflowobj, uri, loadref_run, include_primary=True):
47 """Upload the dependencies of the workflowobj document to Keep.
49 Returns a pathmapper object mapping local paths to keep references. Also
50 does an in-place update of references in "workflowobj".
52 Use scandeps to find $import, $include, $schemas, run, File and Directory
53 fields that represent external references.
55 If workflowobj has an "id" field, this will reload the document to ensure
56 it is scanning the raw document prior to preprocessing.
61 joined = document_loader.fetcher.urljoin(b, u)
62 defrg, _ = urlparse.urldefrag(joined)
63 if defrg not in loaded:
65 # Use fetch_text to get raw file (before preprocessing).
66 text = document_loader.fetch_text(defrg)
67 if isinstance(text, bytes):
68 textIO = StringIO(text.decode('utf-8'))
70 textIO = StringIO(text)
71 return yaml.safe_load(textIO)
76 loadref_fields = set(("$import", "run"))
78 loadref_fields = set(("$import",))
81 if "id" in workflowobj:
82 # Need raw file content (before preprocessing) to ensure
83 # that external references in $include and $mixin are captured.
84 scanobj = loadref("", workflowobj["id"])
86 sc = scandeps(uri, scanobj,
88 set(("$include", "$schemas", "location")),
89 loadref, urljoin=document_loader.fetcher.urljoin)
91 normalizeFilesDirs(sc)
93 if include_primary and "id" in workflowobj:
94 sc.append({"class": "File", "location": workflowobj["id"]})
96 if "$schemas" in workflowobj:
97 for s in workflowobj["$schemas"]:
98 sc.append({"class": "File", "location": s})
100 mapper = ArvPathMapper(arvrunner, sc, "",
106 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
107 p["location"] = mapper.mapper(p["location"]).resolved
108 adjustFileObjs(workflowobj, setloc)
109 adjustDirObjs(workflowobj, setloc)
111 if "$schemas" in workflowobj:
113 for s in workflowobj["$schemas"]:
114 sch.append(mapper.mapper(s).resolved)
115 workflowobj["$schemas"] = sch
120 def upload_docker(arvrunner, tool):
121 """Uploads Docker images used in CommandLineTool objects."""
123 if isinstance(tool, CommandLineTool):
124 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
126 if docker_req.get("dockerOutputDirectory"):
127 # TODO: can be supported by containers API, but not jobs API.
128 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
129 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
130 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
131 elif isinstance(tool, cwltool.workflow.Workflow):
133 upload_docker(arvrunner, s.embedded_tool)
135 def packed_workflow(arvrunner, tool):
136 """Create a packed workflow.
138 A "packed" workflow is one where all the components have been combined into a single document."""
140 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
141 tool.tool["id"], tool.metadata)
143 def tag_git_version(packed):
144 if tool.tool["id"].startswith("file://"):
145 path = os.path.dirname(tool.tool["id"][7:])
147 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
148 except (OSError, subprocess.CalledProcessError):
151 packed["http://schema.org/version"] = githash
154 def upload_job_order(arvrunner, name, tool, job_order):
155 """Upload local files referenced in the input object and return updated input
156 object with 'location' updated to the proper keep references.
159 for t in tool.tool["inputs"]:
160 def setSecondary(fileobj):
161 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
162 if "secondaryFiles" not in fileobj:
163 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
165 if isinstance(fileobj, list):
169 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
170 setSecondary(job_order[shortname(t["id"])])
172 jobmapper = upload_dependencies(arvrunner,
176 job_order.get("id", "#"),
179 if "id" in job_order:
182 # Need to filter this out, gets added by cwltool when providing
183 # parameters on the command line.
184 if "job_order" in job_order:
185 del job_order["job_order"]
189 def upload_workflow_deps(arvrunner, tool):
190 # Ensure that Docker images needed by this workflow are available
192 upload_docker(arvrunner, tool)
194 document_loader = tool.doc_loader
196 def upload_tool_deps(deptool):
198 upload_dependencies(arvrunner,
199 "%s dependencies" % (shortname(deptool["id"])),
204 include_primary=False)
205 document_loader.idx[deptool["id"]] = deptool
207 tool.visit(upload_tool_deps)
209 def arvados_jobs_image(arvrunner, img):
210 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
213 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
214 except Exception as e:
215 raise Exception("Docker image %s is not available\n%s" % (img, e) )
218 def upload_workflow_collection(arvrunner, name, packed):
219 collection = arvados.collection.Collection(api_client=arvrunner.api,
220 keep_client=arvrunner.keep_client,
221 num_retries=arvrunner.num_retries)
222 with collection.open("workflow.cwl", "w") as f:
223 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
225 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
226 ["name", "like", name+"%"]]
227 if arvrunner.project_uuid:
228 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
229 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
232 logger.info("Using collection %s", exists["items"][0]["uuid"])
234 collection.save_new(name=name,
235 owner_uuid=arvrunner.project_uuid,
236 ensure_unique_name=True,
237 num_retries=arvrunner.num_retries)
238 logger.info("Uploaded to %s", collection.manifest_locator())
240 return collection.portable_data_hash()
243 class Runner(object):
244 """Base class for runner processes, which submit an instance of
245 arvados-cwl-runner and wait for the final result."""
247 def __init__(self, runner, tool, job_order, enable_reuse,
248 output_name, output_tags, submit_runner_ram=0,
249 name=None, on_error=None, submit_runner_image=None,
250 intermediate_output_ttl=0):
251 self.arvrunner = runner
253 self.job_order = job_order
255 self.enable_reuse = enable_reuse
257 self.final_output = None
258 self.output_name = output_name
259 self.output_tags = output_tags
261 self.on_error = on_error
262 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
263 self.intermediate_output_ttl = intermediate_output_ttl
265 if submit_runner_ram:
266 self.submit_runner_ram = submit_runner_ram
268 self.submit_runner_ram = 3000
270 if self.submit_runner_ram <= 0:
271 raise Exception("Value of --submit-runner-ram must be greater than zero")
273 def update_pipeline_component(self, record):
276 def done(self, record):
277 """Base method for handling a completed runner."""
280 if record["state"] == "Complete":
281 if record.get("exit_code") is not None:
282 if record["exit_code"] == 33:
283 processStatus = "UnsupportedRequirement"
284 elif record["exit_code"] == 0:
285 processStatus = "success"
287 processStatus = "permanentFail"
289 processStatus = "success"
291 processStatus = "permanentFail"
295 if processStatus == "permanentFail":
296 logc = arvados.collection.CollectionReader(record["log"],
297 api_client=self.arvrunner.api,
298 keep_client=self.arvrunner.keep_client,
299 num_retries=self.arvrunner.num_retries)
300 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
302 self.final_output = record["output"]
303 outc = arvados.collection.CollectionReader(self.final_output,
304 api_client=self.arvrunner.api,
305 keep_client=self.arvrunner.keep_client,
306 num_retries=self.arvrunner.num_retries)
307 if "cwl.output.json" in outc:
308 with outc.open("cwl.output.json") as f:
310 outputs = json.load(f)
311 def keepify(fileobj):
312 path = fileobj["location"]
313 if not path.startswith("keep:"):
314 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
315 adjustFileObjs(outputs, keepify)
316 adjustDirObjs(outputs, keepify)
317 except Exception as e:
318 logger.exception("[%s] While getting final output object: %s", self.name, e)
319 self.arvrunner.output_callback({}, "permanentFail")
321 self.arvrunner.output_callback(outputs, processStatus)
323 if record["uuid"] in self.arvrunner.processes:
324 del self.arvrunner.processes[record["uuid"]]