Merge branch 'master' into 8876-work-unit
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.events
11 import arvados.util
12 import copy
13 import cwltool.docker
14 from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import fnmatch
19 from functools import partial
20 import json
21 import logging
22 import os
23 import pkg_resources  # part of setuptools
24 import re
25 import sys
26 import threading
27 from cwltool.load_tool import fetch_document
28 from cwltool.builder import Builder
29 import urlparse
30
31 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
32 from arvados.api import OrderedJsonModel
33
34 logger = logging.getLogger('arvados.cwl-runner')
35 logger.setLevel(logging.INFO)
36
37 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
38 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
39 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
40
41
42 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
43     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
44
45     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
46         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
47
48     sp = dockerRequirement["dockerImageId"].split(":")
49     image_name = sp[0]
50     image_tag = sp[1] if len(sp) > 1 else None
51
52     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
53                                                             image_name=image_name,
54                                                             image_tag=image_tag)
55
56     if not images:
57         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
58         args = ["--project-uuid="+project_uuid, image_name]
59         if image_tag:
60             args.append(image_tag)
61         logger.info("Uploading Docker image %s", ":".join(args[1:]))
62         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
63
64     return dockerRequirement["dockerImageId"]
65
66
67 class CollectionFsAccess(cwltool.process.StdFsAccess):
68     """Implement the cwltool FsAccess interface for Arvados Collections."""
69
70     def __init__(self, basedir):
71         super(CollectionFsAccess, self).__init__(basedir)
72         self.collections = {}
73
74     def get_collection(self, path):
75         p = path.split("/")
76         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
77             pdh = p[0][5:]
78             if pdh not in self.collections:
79                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
80             return (self.collections[pdh], "/".join(p[1:]))
81         else:
82             return (None, path)
83
84     def _match(self, collection, patternsegments, parent):
85         if not patternsegments:
86             return []
87
88         if not isinstance(collection, arvados.collection.RichCollectionBase):
89             return []
90
91         ret = []
92         # iterate over the files and subcollections in 'collection'
93         for filename in collection:
94             if patternsegments[0] == '.':
95                 # Pattern contains something like "./foo" so just shift
96                 # past the "./"
97                 ret.extend(self._match(collection, patternsegments[1:], parent))
98             elif fnmatch.fnmatch(filename, patternsegments[0]):
99                 cur = os.path.join(parent, filename)
100                 if len(patternsegments) == 1:
101                     ret.append(cur)
102                 else:
103                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
104         return ret
105
106     def glob(self, pattern):
107         collection, rest = self.get_collection(pattern)
108         patternsegments = rest.split("/")
109         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
110
111     def open(self, fn, mode):
112         collection, rest = self.get_collection(fn)
113         if collection:
114             return collection.open(rest, mode)
115         else:
116             return open(self._abs(fn), mode)
117
118     def exists(self, fn):
119         collection, rest = self.get_collection(fn)
120         if collection:
121             return collection.exists(rest)
122         else:
123             return os.path.exists(self._abs(fn))
124
125 class ArvadosJob(object):
126     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
127
128     def __init__(self, runner):
129         self.arvrunner = runner
130         self.running = False
131
132     def run(self, dry_run=False, pull_image=True, **kwargs):
133         script_parameters = {
134             "command": self.command_line
135         }
136         runtime_constraints = {}
137
138         if self.generatefiles:
139             vwd = arvados.collection.Collection()
140             script_parameters["task.vwd"] = {}
141             for t in self.generatefiles:
142                 if isinstance(self.generatefiles[t], dict):
143                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
144                     vwd.copy(rest, t, source_collection=src)
145                 else:
146                     with vwd.open(t, "w") as f:
147                         f.write(self.generatefiles[t])
148             vwd.save_new()
149             for t in self.generatefiles:
150                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
151
152         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
153         if self.environment:
154             script_parameters["task.env"].update(self.environment)
155
156         if self.stdin:
157             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
158
159         if self.stdout:
160             script_parameters["task.stdout"] = self.stdout
161
162         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
163         if docker_req and kwargs.get("use_container") is not False:
164             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
165         else:
166             runtime_constraints["docker_image"] = "arvados/jobs"
167
168         resources = self.builder.resources
169         if resources is not None:
170             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
171             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
172             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
173
174         filters = [["repository", "=", "arvados"],
175                    ["script", "=", "crunchrunner"],
176                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
177         if not self.arvrunner.ignore_docker_for_reuse:
178             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
179
180         try:
181             response = self.arvrunner.api.jobs().create(
182                 body={
183                     "owner_uuid": self.arvrunner.project_uuid,
184                     "script": "crunchrunner",
185                     "repository": "arvados",
186                     "script_version": "master",
187                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
188                     "script_parameters": {"tasks": [script_parameters]},
189                     "runtime_constraints": runtime_constraints
190                 },
191                 filters=filters,
192                 find_or_create=kwargs.get("enable_reuse", True)
193             ).execute(num_retries=self.arvrunner.num_retries)
194
195             self.arvrunner.jobs[response["uuid"]] = self
196
197             self.arvrunner.pipeline["components"][self.name] = {"job": response}
198             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
199                                                                                      body={
200                                                                                          "components": self.arvrunner.pipeline["components"]
201                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
202
203             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
204
205             if response["state"] in ("Complete", "Failed", "Cancelled"):
206                 self.done(response)
207         except Exception as e:
208             logger.error("Got error %s" % str(e))
209             self.output_callback({}, "permanentFail")
210
211     def update_pipeline_component(self, record):
212         self.arvrunner.pipeline["components"][self.name] = {"job": record}
213         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
214                                                                                  body={
215                                                                                     "components": self.arvrunner.pipeline["components"]
216                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
217
218     def done(self, record):
219         try:
220             self.update_pipeline_component(record)
221         except:
222             pass
223
224         try:
225             if record["state"] == "Complete":
226                 processStatus = "success"
227             else:
228                 processStatus = "permanentFail"
229
230             try:
231                 outputs = {}
232                 if record["output"]:
233                     logc = arvados.collection.Collection(record["log"])
234                     log = logc.open(logc.keys()[0])
235                     tmpdir = None
236                     outdir = None
237                     keepdir = None
238                     for l in log:
239                         # Determine the tmpdir, outdir and keepdir paths from
240                         # the job run.  Unfortunately, we can't take the first
241                         # values we find (which are expected to be near the
242                         # top) and stop scanning because if the node fails and
243                         # the job restarts on a different node these values
244                         # will different runs, and we need to know about the
245                         # final run that actually produced output.
246
247                         g = tmpdirre.match(l)
248                         if g:
249                             tmpdir = g.group(1)
250                         g = outdirre.match(l)
251                         if g:
252                             outdir = g.group(1)
253                         g = keepre.match(l)
254                         if g:
255                             keepdir = g.group(1)
256
257                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
258
259                     # check if collection already exists with same owner, name and content
260                     collection_exists = self.arvrunner.api.collections().list(
261                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
262                                  ['portable_data_hash', '=', record["output"]],
263                                  ["name", "=", colname]]
264                     ).execute(num_retries=self.arvrunner.num_retries)
265
266                     if not collection_exists["items"]:
267                         # Create a collection located in the same project as the
268                         # pipeline with the contents of the output.
269                         # First, get output record.
270                         collections = self.arvrunner.api.collections().list(
271                             limit=1,
272                             filters=[['portable_data_hash', '=', record["output"]]],
273                             select=["manifest_text"]
274                         ).execute(num_retries=self.arvrunner.num_retries)
275
276                         if not collections["items"]:
277                             raise WorkflowException(
278                                 "Job output '%s' cannot be found on API server" % (
279                                     record["output"]))
280
281                         # Create new collection in the parent project
282                         # with the output contents.
283                         self.arvrunner.api.collections().create(body={
284                             "owner_uuid": self.arvrunner.project_uuid,
285                             "name": colname,
286                             "portable_data_hash": record["output"],
287                             "manifest_text": collections["items"][0]["manifest_text"]
288                         }, ensure_unique_name=True).execute(
289                             num_retries=self.arvrunner.num_retries)
290
291                     self.builder.outdir = outdir
292                     self.builder.pathmapper.keepdir = keepdir
293                     outputs = self.collect_outputs("keep:" + record["output"])
294             except WorkflowException as e:
295                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
296                 processStatus = "permanentFail"
297             except Exception as e:
298                 logger.exception("Got unknown exception while collecting job outputs:")
299                 processStatus = "permanentFail"
300
301             self.output_callback(outputs, processStatus)
302         finally:
303             del self.arvrunner.jobs[record["uuid"]]
304
305
306 class RunnerJob(object):
307     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
308
309     def __init__(self, runner, tool, job_order, enable_reuse):
310         self.arvrunner = runner
311         self.tool = tool
312         self.job_order = job_order
313         self.running = False
314         self.enable_reuse = enable_reuse
315
316     def update_pipeline_component(self, record):
317         pass
318
319     def upload_docker(self, tool):
320         if isinstance(tool, CommandLineTool):
321             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
322             if docker_req:
323                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
324         elif isinstance(tool, cwltool.workflow.Workflow):
325             for s in tool.steps:
326                 self.upload_docker(s.embedded_tool)
327
328     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
329         """Create an Arvados job specification for this workflow.
330
331         The returned dict can be used to create a job (i.e., passed as
332         the +body+ argument to jobs().create()), or as a component in
333         a pipeline template or pipeline instance.
334         """
335         self.upload_docker(self.tool)
336
337         workflowfiles = set()
338         jobfiles = set()
339         workflowfiles.add(self.tool.tool["id"])
340
341         self.name = os.path.basename(self.tool.tool["id"])
342
343         def visitFiles(files, path):
344             files.add(path)
345             return path
346
347         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
348         def loadref(b, u):
349             return document_loader.fetch(urlparse.urljoin(b, u))
350
351         sc = scandeps(uri, workflowobj,
352                       set(("$import", "run")),
353                       set(("$include", "$schemas", "path")),
354                       loadref)
355         adjustFiles(sc, partial(visitFiles, workflowfiles))
356         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
357
358         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
359                                        "%s",
360                                        "%s/%s",
361                                        name=self.name,
362                                        **kwargs)
363
364         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
365                                   "%s",
366                                   "%s/%s",
367                                   name=os.path.basename(self.job_order.get("id", "#")),
368                                   **kwargs)
369
370         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
371
372         if "id" in self.job_order:
373             del self.job_order["id"]
374
375         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
376         return {
377             "script": "cwl-runner",
378             "script_version": "master",
379             "repository": "arvados",
380             "script_parameters": self.job_order,
381             "runtime_constraints": {
382                 "docker_image": "arvados/jobs"
383             }
384         }
385
386     def run(self, *args, **kwargs):
387         job_spec = self.arvados_job_spec(*args, **kwargs)
388         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
389
390         response = self.arvrunner.api.jobs().create(
391             body=job_spec,
392             find_or_create=self.enable_reuse
393         ).execute(num_retries=self.arvrunner.num_retries)
394
395         self.uuid = response["uuid"]
396         self.arvrunner.jobs[self.uuid] = self
397
398         logger.info("Submitted job %s", response["uuid"])
399
400         if response["state"] in ("Complete", "Failed", "Cancelled"):
401             self.done(response)
402
403     def done(self, record):
404         if record["state"] == "Complete":
405             processStatus = "success"
406         else:
407             processStatus = "permanentFail"
408
409         outputs = None
410         try:
411             try:
412                 outc = arvados.collection.Collection(record["output"])
413                 with outc.open("cwl.output.json") as f:
414                     outputs = json.load(f)
415                 def keepify(path):
416                     if not path.startswith("keep:"):
417                         return "keep:%s/%s" % (record["output"], path)
418                 adjustFiles(outputs, keepify)
419             except Exception as e:
420                 logger.error("While getting final output object: %s", e)
421             self.arvrunner.output_callback(outputs, processStatus)
422         finally:
423             del self.arvrunner.jobs[record["uuid"]]
424
425
426 class RunnerTemplate(object):
427     """An Arvados pipeline template that invokes a CWL workflow."""
428
429     type_to_dataclass = {
430         'boolean': 'boolean',
431         'File': 'File',
432         'float': 'number',
433         'int': 'number',
434         'string': 'text',
435     }
436
437     def __init__(self, runner, tool, job_order, enable_reuse):
438         self.runner = runner
439         self.tool = tool
440         self.job = RunnerJob(
441             runner=runner,
442             tool=tool,
443             job_order=job_order,
444             enable_reuse=enable_reuse)
445
446     def pipeline_component_spec(self):
447         """Return a component that Workbench and a-r-p-i will understand.
448
449         Specifically, translate CWL input specs to Arvados pipeline
450         format, like {"dataclass":"File","value":"xyz"}.
451         """
452         spec = self.job.arvados_job_spec()
453
454         # Most of the component spec is exactly the same as the job
455         # spec (script, script_version, etc.).
456         # spec['script_parameters'] isn't right, though. A component
457         # spec's script_parameters hash is a translation of
458         # self.tool.tool['inputs'] with defaults/overrides taken from
459         # the job order. So we move the job parameters out of the way
460         # and build a new spec['script_parameters'].
461         job_params = spec['script_parameters']
462         spec['script_parameters'] = {}
463
464         for param in self.tool.tool['inputs']:
465             param = copy.deepcopy(param)
466
467             # Data type and "required" flag...
468             types = param['type']
469             if not isinstance(types, list):
470                 types = [types]
471             param['required'] = 'null' not in types
472             non_null_types = set(types) - set(['null'])
473             if len(non_null_types) == 1:
474                 the_type = [c for c in non_null_types][0]
475                 dataclass = self.type_to_dataclass.get(the_type)
476                 if dataclass:
477                     param['dataclass'] = dataclass
478             # Note: If we didn't figure out a single appropriate
479             # dataclass, we just left that attribute out.  We leave
480             # the "type" attribute there in any case, which might help
481             # downstream.
482
483             # Title and description...
484             title = param.pop('label', '')
485             descr = param.pop('description', '').rstrip('\n')
486             if title:
487                 param['title'] = title
488             if descr:
489                 param['description'] = descr
490
491             # Fill in the value from the current job order, if any.
492             param_id = shortname(param.pop('id'))
493             value = job_params.get(param_id)
494             if value is None:
495                 pass
496             elif not isinstance(value, dict):
497                 param['value'] = value
498             elif param.get('dataclass') == 'File' and value.get('path'):
499                 param['value'] = value['path']
500
501             spec['script_parameters'][param_id] = param
502         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
503         return spec
504
505     def save(self):
506         job_spec = self.pipeline_component_spec()
507         response = self.runner.api.pipeline_templates().create(body={
508             "components": {
509                 self.job.name: job_spec,
510             },
511             "name": self.job.name,
512             "owner_uuid": self.runner.project_uuid,
513         }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
514         self.uuid = response["uuid"]
515         logger.info("Created template %s", self.uuid)
516
517
518 class ArvPathMapper(cwltool.pathmapper.PathMapper):
519     """Convert container-local paths to and from Keep collection ids."""
520
521     def __init__(self, arvrunner, referenced_files, input_basedir,
522                  collection_pattern, file_pattern, name=None, **kwargs):
523         self._pathmap = arvrunner.get_uploaded()
524         uploadfiles = set()
525
526         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
527
528         for src in referenced_files:
529             if isinstance(src, basestring) and pdh_path.match(src):
530                 self._pathmap[src] = (src, collection_pattern % src[5:])
531             if "#" in src:
532                 src = src[:src.index("#")]
533             if src not in self._pathmap:
534                 ab = cwltool.pathmapper.abspath(src, input_basedir)
535                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
536                 if kwargs.get("conformance_test"):
537                     self._pathmap[src] = (src, ab)
538                 elif isinstance(st, arvados.commands.run.UploadFile):
539                     uploadfiles.add((src, ab, st))
540                 elif isinstance(st, arvados.commands.run.ArvFile):
541                     self._pathmap[src] = (ab, st.fn)
542                 else:
543                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
544
545         if uploadfiles:
546             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
547                                              arvrunner.api,
548                                              dry_run=kwargs.get("dry_run"),
549                                              num_retries=3,
550                                              fnPattern=file_pattern,
551                                              name=name,
552                                              project=arvrunner.project_uuid)
553
554         for src, ab, st in uploadfiles:
555             arvrunner.add_uploaded(src, (ab, st.fn))
556             self._pathmap[src] = (ab, st.fn)
557
558         self.keepdir = None
559
560     def reversemap(self, target):
561         if target.startswith("keep:"):
562             return (target, target)
563         elif self.keepdir and target.startswith(self.keepdir):
564             return (target, "keep:" + target[len(self.keepdir)+1:])
565         else:
566             return super(ArvPathMapper, self).reversemap(target)
567
568
569 class ArvadosCommandTool(CommandLineTool):
570     """Wrap cwltool CommandLineTool to override selected methods."""
571
572     def __init__(self, arvrunner, toolpath_object, **kwargs):
573         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
574         self.arvrunner = arvrunner
575
576     def makeJobRunner(self):
577         return ArvadosJob(self.arvrunner)
578
579     def makePathMapper(self, reffiles, **kwargs):
580         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
581                              "$(task.keep)/%s",
582                              "$(task.keep)/%s/%s",
583                              **kwargs)
584
585
586 class ArvCwlRunner(object):
587     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
588     complete, and report output."""
589
590     def __init__(self, api_client):
591         self.api = api_client
592         self.jobs = {}
593         self.lock = threading.Lock()
594         self.cond = threading.Condition(self.lock)
595         self.final_output = None
596         self.uploaded = {}
597         self.num_retries = 4
598
599     def arvMakeTool(self, toolpath_object, **kwargs):
600         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
601             return ArvadosCommandTool(self, toolpath_object, **kwargs)
602         else:
603             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
604
605     def output_callback(self, out, processStatus):
606         if processStatus == "success":
607             logger.info("Overall job status is %s", processStatus)
608             if self.pipeline:
609                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
610                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
611
612         else:
613             logger.warn("Overall job status is %s", processStatus)
614             if self.pipeline:
615                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
616                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
617         self.final_output = out
618
619     def on_message(self, event):
620         if "object_uuid" in event:
621             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
622                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
623                     uuid = event["object_uuid"]
624                     with self.lock:
625                         j = self.jobs[uuid]
626                         logger.info("Job %s (%s) is Running", j.name, uuid)
627                         j.running = True
628                         j.update_pipeline_component(event["properties"]["new_attributes"])
629                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
630                     uuid = event["object_uuid"]
631                     try:
632                         self.cond.acquire()
633                         j = self.jobs[uuid]
634                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
635                         j.done(event["properties"]["new_attributes"])
636                         self.cond.notify()
637                     finally:
638                         self.cond.release()
639
640     def get_uploaded(self):
641         return self.uploaded.copy()
642
643     def add_uploaded(self, src, pair):
644         self.uploaded[src] = pair
645
646     def arvExecutor(self, tool, job_order, **kwargs):
647         self.debug = kwargs.get("debug")
648
649         if kwargs.get("quiet"):
650             logger.setLevel(logging.WARN)
651             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
652
653         useruuid = self.api.users().current().execute()["uuid"]
654         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
655         self.pipeline = None
656
657         if kwargs.get("create_template"):
658             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
659             tmpl.save()
660             # cwltool.main will write our return value to stdout.
661             return tmpl.uuid
662
663         if kwargs.get("submit"):
664             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
665             if not kwargs.get("wait"):
666                 runnerjob.run()
667                 return runnerjob.uuid
668
669         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
670
671         self.debug = kwargs.get("debug")
672         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
673         self.fs_access = CollectionFsAccess(kwargs["basedir"])
674
675         kwargs["fs_access"] = self.fs_access
676         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
677
678         kwargs["outdir"] = "$(task.outdir)"
679         kwargs["tmpdir"] = "$(task.tmpdir)"
680
681         if kwargs.get("conformance_test"):
682             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
683         else:
684             if kwargs.get("submit"):
685                 jobiter = iter((runnerjob,))
686             else:
687                 components = {}
688                 if "cwl_runner_job" in kwargs:
689                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
690
691                 self.pipeline = self.api.pipeline_instances().create(
692                     body={
693                         "owner_uuid": self.project_uuid,
694                         "name": shortname(tool.tool["id"]),
695                         "components": components,
696                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
697
698                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
699
700                 jobiter = tool.job(job_order,
701                                    self.output_callback,
702                                    docker_outdir="$(task.outdir)",
703                                    **kwargs)
704
705             try:
706                 self.cond.acquire()
707                 # Will continue to hold the lock for the duration of this code
708                 # except when in cond.wait(), at which point on_message can update
709                 # job state and process output callbacks.
710
711                 for runnable in jobiter:
712                     if runnable:
713                         runnable.run(**kwargs)
714                     else:
715                         if self.jobs:
716                             self.cond.wait(1)
717                         else:
718                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
719                             break
720
721                 while self.jobs:
722                     self.cond.wait(1)
723
724                 events.close()
725             except:
726                 if sys.exc_info()[0] is KeyboardInterrupt:
727                     logger.error("Interrupted, marking pipeline as failed")
728                 else:
729                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
730                 if self.pipeline:
731                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
732                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
733             finally:
734                 self.cond.release()
735
736             if self.final_output is None:
737                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
738
739             return self.final_output
740
741 def versionstring():
742     """Print version string of key packages for provenance and debugging."""
743
744     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
745     arvpkg = pkg_resources.require("arvados-python-client")
746     cwlpkg = pkg_resources.require("cwltool")
747
748     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
749                                     "arvados-python-client", arvpkg[0].version,
750                                     "cwltool", cwlpkg[0].version)
751
752 def arg_parser():  # type: () -> argparse.ArgumentParser
753     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
754
755     parser.add_argument("--conformance-test", action="store_true")
756     parser.add_argument("--basedir", type=str,
757                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
758     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
759                         help="Output directory, default current directory")
760
761     parser.add_argument("--eval-timeout",
762                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
763                         type=float,
764                         default=20)
765     parser.add_argument("--version", action="store_true", help="Print version and exit")
766
767     exgroup = parser.add_mutually_exclusive_group()
768     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
769     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
770     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
771
772     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
773
774     exgroup = parser.add_mutually_exclusive_group()
775     exgroup.add_argument("--enable-reuse", action="store_true",
776                         default=True, dest="enable_reuse",
777                         help="")
778     exgroup.add_argument("--disable-reuse", action="store_false",
779                         default=True, dest="enable_reuse",
780                         help="")
781
782     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
783     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
784                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
785                         default=False)
786
787     exgroup = parser.add_mutually_exclusive_group()
788     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
789                         default=True, dest="submit")
790     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
791                         default=True, dest="submit")
792     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
793
794     exgroup = parser.add_mutually_exclusive_group()
795     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
796                         default=True, dest="wait")
797     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
798                         default=True, dest="wait")
799
800     parser.add_argument("workflow", type=str, nargs="?", default=None)
801     parser.add_argument("job_order", nargs=argparse.REMAINDER)
802
803     return parser
804
805 def main(args, stdout, stderr, api_client=None):
806     parser = arg_parser()
807
808     job_order_object = None
809     arvargs = parser.parse_args(args)
810     if arvargs.create_template and not arvargs.job_order:
811         job_order_object = ({}, "")
812
813     try:
814         if api_client is None:
815             api_client=arvados.api('v1', model=OrderedJsonModel())
816         runner = ArvCwlRunner(api_client)
817     except Exception as e:
818         logger.error(e)
819         return 1
820
821     return cwltool.main.main(args=arvargs,
822                              stdout=stdout,
823                              stderr=stderr,
824                              executor=runner.arvExecutor,
825                              makeTool=runner.arvMakeTool,
826                              versionfunc=versionstring,
827                              job_order_object=job_order_object)