3 from functools import partial
8 from StringIO import StringIO
10 from schema_salad.sourceline import SourceLine
12 import cwltool.draft2tool
13 from cwltool.draft2tool import CommandLineTool
14 import cwltool.workflow
15 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
16 from cwltool.load_tool import fetch_document
17 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
18 from cwltool.utils import aslist
19 from cwltool.builder import substitute
20 from cwltool.pack import pack
22 import arvados.collection
23 import ruamel.yaml as yaml
25 from .arvdocker import arv_docker_get_image
26 from .pathmapper import ArvPathMapper, trim_listing
27 from ._version import __version__
30 logger = logging.getLogger('arvados.cwl-runner')
32 def trim_anonymous_location(obj):
33 """Remove 'location' field from File and Directory literals.
35 To make internal handling easier, literals are assigned a random id for
36 'location'. However, when writing the record back out, this can break
37 reproducibility. Since it is valid for literals not have a 'location'
42 if obj.get("location", "").startswith("_:"):
45 def upload_dependencies(arvrunner, name, document_loader,
46 workflowobj, uri, loadref_run, include_primary=True):
47 """Upload the dependencies of the workflowobj document to Keep.
49 Returns a pathmapper object mapping local paths to keep references. Also
50 does an in-place update of references in "workflowobj".
52 Use scandeps to find $import, $include, $schemas, run, File and Directory
53 fields that represent external references.
55 If workflowobj has an "id" field, this will reload the document to ensure
56 it is scanning the raw document prior to preprocessing.
61 joined = document_loader.fetcher.urljoin(b, u)
62 defrg, _ = urlparse.urldefrag(joined)
63 if defrg not in loaded:
65 # Use fetch_text to get raw file (before preprocessing).
66 text = document_loader.fetch_text(defrg)
67 if isinstance(text, bytes):
68 textIO = StringIO(text.decode('utf-8'))
70 textIO = StringIO(text)
71 return yaml.safe_load(textIO)
76 loadref_fields = set(("$import", "run"))
78 loadref_fields = set(("$import",))
81 if "id" in workflowobj:
82 # Need raw file content (before preprocessing) to ensure
83 # that external references in $include and $mixin are captured.
84 scanobj = loadref("", workflowobj["id"])
86 sc = scandeps(uri, scanobj,
88 set(("$include", "$schemas", "location")),
89 loadref, urljoin=document_loader.fetcher.urljoin)
91 normalizeFilesDirs(sc)
93 if include_primary and "id" in workflowobj:
94 sc.append({"class": "File", "location": workflowobj["id"]})
96 if "$schemas" in workflowobj:
97 for s in workflowobj["$schemas"]:
98 sc.append({"class": "File", "location": s})
100 mapper = ArvPathMapper(arvrunner, sc, "",
104 single_collection=True)
107 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
108 p["location"] = mapper.mapper(p["location"]).resolved
109 adjustFileObjs(workflowobj, setloc)
110 adjustDirObjs(workflowobj, setloc)
112 if "$schemas" in workflowobj:
114 for s in workflowobj["$schemas"]:
115 sch.append(mapper.mapper(s).resolved)
116 workflowobj["$schemas"] = sch
121 def upload_docker(arvrunner, tool):
122 """Uploads Docker images used in CommandLineTool objects."""
124 if isinstance(tool, CommandLineTool):
125 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
127 if docker_req.get("dockerOutputDirectory"):
128 # TODO: can be supported by containers API, but not jobs API.
129 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
130 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
131 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
132 elif isinstance(tool, cwltool.workflow.Workflow):
134 upload_docker(arvrunner, s.embedded_tool)
136 def packed_workflow(arvrunner, tool):
137 """Create a packed workflow.
139 A "packed" workflow is one where all the components have been combined into a single document."""
141 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
142 tool.tool["id"], tool.metadata)
144 def tag_git_version(packed):
145 if tool.tool["id"].startswith("file://"):
146 path = os.path.dirname(tool.tool["id"][7:])
148 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
149 except (OSError, subprocess.CalledProcessError):
152 packed["http://schema.org/version"] = githash
155 def upload_job_order(arvrunner, name, tool, job_order):
156 """Upload local files referenced in the input object and return updated input
157 object with 'location' updated to the proper keep references.
160 for t in tool.tool["inputs"]:
161 def setSecondary(fileobj):
162 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
163 if "secondaryFiles" not in fileobj:
164 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
166 if isinstance(fileobj, list):
170 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
171 setSecondary(job_order[shortname(t["id"])])
173 jobmapper = upload_dependencies(arvrunner,
177 job_order.get("id", "#"),
180 if "id" in job_order:
183 # Need to filter this out, gets added by cwltool when providing
184 # parameters on the command line.
185 if "job_order" in job_order:
186 del job_order["job_order"]
190 def upload_workflow_deps(arvrunner, tool, override_tools):
191 # Ensure that Docker images needed by this workflow are available
193 upload_docker(arvrunner, tool)
195 document_loader = tool.doc_loader
197 def upload_tool_deps(deptool):
199 upload_dependencies(arvrunner,
200 "%s dependencies" % (shortname(deptool["id"])),
205 include_primary=False)
206 document_loader.idx[deptool["id"]] = deptool
207 override_tools[deptool["id"]] = json.dumps(deptool)
209 tool.visit(upload_tool_deps)
211 def arvados_jobs_image(arvrunner, img):
212 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
215 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
216 except Exception as e:
217 raise Exception("Docker image %s is not available\n%s" % (img, e) )
220 def upload_workflow_collection(arvrunner, name, packed):
221 collection = arvados.collection.Collection(api_client=arvrunner.api,
222 keep_client=arvrunner.keep_client,
223 num_retries=arvrunner.num_retries)
224 with collection.open("workflow.cwl", "w") as f:
225 f.write(json.dumps(packed, indent=2, sort_keys=True, separators=(',',': ')))
227 filters = [["portable_data_hash", "=", collection.portable_data_hash()],
228 ["name", "like", name+"%"]]
229 if arvrunner.project_uuid:
230 filters.append(["owner_uuid", "=", arvrunner.project_uuid])
231 exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
234 logger.info("Using collection %s", exists["items"][0]["uuid"])
236 collection.save_new(name=name,
237 owner_uuid=arvrunner.project_uuid,
238 ensure_unique_name=True,
239 num_retries=arvrunner.num_retries)
240 logger.info("Uploaded to %s", collection.manifest_locator())
242 return collection.portable_data_hash()
245 class Runner(object):
246 """Base class for runner processes, which submit an instance of
247 arvados-cwl-runner and wait for the final result."""
249 def __init__(self, runner, tool, job_order, enable_reuse,
250 output_name, output_tags, submit_runner_ram=0,
251 name=None, on_error=None, submit_runner_image=None,
252 intermediate_output_ttl=0):
253 self.arvrunner = runner
255 self.job_order = job_order
257 self.enable_reuse = enable_reuse
259 self.final_output = None
260 self.output_name = output_name
261 self.output_tags = output_tags
263 self.on_error = on_error
264 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
265 self.intermediate_output_ttl = intermediate_output_ttl
267 if submit_runner_ram:
268 self.submit_runner_ram = submit_runner_ram
270 self.submit_runner_ram = 3000
272 if self.submit_runner_ram <= 0:
273 raise Exception("Value of --submit-runner-ram must be greater than zero")
275 def update_pipeline_component(self, record):
278 def done(self, record):
279 """Base method for handling a completed runner."""
282 if record["state"] == "Complete":
283 if record.get("exit_code") is not None:
284 if record["exit_code"] == 33:
285 processStatus = "UnsupportedRequirement"
286 elif record["exit_code"] == 0:
287 processStatus = "success"
289 processStatus = "permanentFail"
291 processStatus = "success"
293 processStatus = "permanentFail"
297 if processStatus == "permanentFail":
298 logc = arvados.collection.CollectionReader(record["log"],
299 api_client=self.arvrunner.api,
300 keep_client=self.arvrunner.keep_client,
301 num_retries=self.arvrunner.num_retries)
302 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
304 self.final_output = record["output"]
305 outc = arvados.collection.CollectionReader(self.final_output,
306 api_client=self.arvrunner.api,
307 keep_client=self.arvrunner.keep_client,
308 num_retries=self.arvrunner.num_retries)
309 if "cwl.output.json" in outc:
310 with outc.open("cwl.output.json") as f:
312 outputs = json.load(f)
313 def keepify(fileobj):
314 path = fileobj["location"]
315 if not path.startswith("keep:"):
316 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
317 adjustFileObjs(outputs, keepify)
318 adjustDirObjs(outputs, keepify)
319 except Exception as e:
320 logger.exception("[%s] While getting final output object: %s", self.name, e)
321 self.arvrunner.output_callback({}, "permanentFail")
323 self.arvrunner.output_callback(outputs, processStatus)
325 if record["uuid"] in self.arvrunner.processes:
326 del self.arvrunner.processes[record["uuid"]]