8442: Submit containers Work in progess.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.events
11 import arvados.util
12 import copy
13 import cwltool.docker
14 from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import fnmatch
19 from functools import partial
20 import json
21 import logging
22 import os
23 import pkg_resources  # part of setuptools
24 import re
25 import sys
26 import threading
27 from cwltool.load_tool import fetch_document
28 from cwltool.builder import Builder
29 import urlparse
30 from .arvcontainer import ArvadosContainer
31 from .arvdocker import arv_docker_get_image
32
33 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
34 from arvados.api import OrderedJsonModel
35
36 logger = logging.getLogger('arvados.cwl-runner')
37 logger.setLevel(logging.INFO)
38
39 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
40 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
41 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
42
43
44
45
46 class CollectionFsAccess(cwltool.process.StdFsAccess):
47     """Implement the cwltool FsAccess interface for Arvados Collections."""
48
49     def __init__(self, basedir):
50         super(CollectionFsAccess, self).__init__(basedir)
51         self.collections = {}
52
53     def get_collection(self, path):
54         p = path.split("/")
55         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
56             pdh = p[0][5:]
57             if pdh not in self.collections:
58                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
59             return (self.collections[pdh], "/".join(p[1:]))
60         else:
61             return (None, path)
62
63     def _match(self, collection, patternsegments, parent):
64         if not patternsegments:
65             return []
66
67         if not isinstance(collection, arvados.collection.RichCollectionBase):
68             return []
69
70         ret = []
71         # iterate over the files and subcollections in 'collection'
72         for filename in collection:
73             if patternsegments[0] == '.':
74                 # Pattern contains something like "./foo" so just shift
75                 # past the "./"
76                 ret.extend(self._match(collection, patternsegments[1:], parent))
77             elif fnmatch.fnmatch(filename, patternsegments[0]):
78                 cur = os.path.join(parent, filename)
79                 if len(patternsegments) == 1:
80                     ret.append(cur)
81                 else:
82                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
83         return ret
84
85     def glob(self, pattern):
86         collection, rest = self.get_collection(pattern)
87         patternsegments = rest.split("/")
88         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
89
90     def open(self, fn, mode):
91         collection, rest = self.get_collection(fn)
92         if collection:
93             return collection.open(rest, mode)
94         else:
95             return open(self._abs(fn), mode)
96
97     def exists(self, fn):
98         collection, rest = self.get_collection(fn)
99         if collection:
100             return collection.exists(rest)
101         else:
102             return os.path.exists(self._abs(fn))
103
104 class ArvadosJob(object):
105     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
106
107     def __init__(self, runner):
108         self.arvrunner = runner
109         self.running = False
110
111     def run(self, dry_run=False, pull_image=True, **kwargs):
112         script_parameters = {
113             "command": self.command_line
114         }
115         runtime_constraints = {}
116
117         if self.generatefiles:
118             vwd = arvados.collection.Collection()
119             script_parameters["task.vwd"] = {}
120             for t in self.generatefiles:
121                 if isinstance(self.generatefiles[t], dict):
122                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
123                     vwd.copy(rest, t, source_collection=src)
124                 else:
125                     with vwd.open(t, "w") as f:
126                         f.write(self.generatefiles[t])
127             vwd.save_new()
128             for t in self.generatefiles:
129                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
130
131         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
132         if self.environment:
133             script_parameters["task.env"].update(self.environment)
134
135         if self.stdin:
136             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
137
138         if self.stdout:
139             script_parameters["task.stdout"] = self.stdout
140
141         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
142         if docker_req and kwargs.get("use_container") is not False:
143             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
144         else:
145             runtime_constraints["docker_image"] = "arvados/jobs"
146
147         resources = self.builder.resources
148         if resources is not None:
149             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
150             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
151             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
152
153         filters = [["repository", "=", "arvados"],
154                    ["script", "=", "crunchrunner"],
155                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
156         if not self.arvrunner.ignore_docker_for_reuse:
157             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
158
159         try:
160             response = self.arvrunner.api.jobs().create(
161                 body={
162                     "owner_uuid": self.arvrunner.project_uuid,
163                     "script": "crunchrunner",
164                     "repository": "arvados",
165                     "script_version": "master",
166                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
167                     "script_parameters": {"tasks": [script_parameters]},
168                     "runtime_constraints": runtime_constraints
169                 },
170                 filters=filters,
171                 find_or_create=kwargs.get("enable_reuse", True)
172             ).execute(num_retries=self.arvrunner.num_retries)
173
174             self.arvrunner.jobs[response["uuid"]] = self
175
176             self.update_pipeline_component(response)
177
178             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
179
180             if response["state"] in ("Complete", "Failed", "Cancelled"):
181                 self.done(response)
182         except Exception as e:
183             logger.error("Got error %s" % str(e))
184             self.output_callback({}, "permanentFail")
185
186     def update_pipeline_component(self, record):
187         if self.arvrunner.pipeline:
188             self.arvrunner.pipeline["components"][self.name] = {"job": record}
189             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
190                                                                                  body={
191                                                                                     "components": self.arvrunner.pipeline["components"]
192                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
193         if self.arvrunner.uuid:
194             try:
195                 job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
196                 if job:
197                     components = job["components"]
198                     components[self.name] = record["uuid"]
199                     self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
200                         body={
201                             "components": components
202                         }).execute(num_retries=self.arvrunner.num_retries)
203             except Exception as e:
204                 logger.info("Error adding to components: %s", e)
205
206     def done(self, record):
207         try:
208             self.update_pipeline_component(record)
209         except:
210             pass
211
212         try:
213             if record["state"] == "Complete":
214                 processStatus = "success"
215             else:
216                 processStatus = "permanentFail"
217
218             try:
219                 outputs = {}
220                 if record["output"]:
221                     logc = arvados.collection.Collection(record["log"])
222                     log = logc.open(logc.keys()[0])
223                     tmpdir = None
224                     outdir = None
225                     keepdir = None
226                     for l in log:
227                         # Determine the tmpdir, outdir and keepdir paths from
228                         # the job run.  Unfortunately, we can't take the first
229                         # values we find (which are expected to be near the
230                         # top) and stop scanning because if the node fails and
231                         # the job restarts on a different node these values
232                         # will different runs, and we need to know about the
233                         # final run that actually produced output.
234
235                         g = tmpdirre.match(l)
236                         if g:
237                             tmpdir = g.group(1)
238                         g = outdirre.match(l)
239                         if g:
240                             outdir = g.group(1)
241                         g = keepre.match(l)
242                         if g:
243                             keepdir = g.group(1)
244
245                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
246
247                     # check if collection already exists with same owner, name and content
248                     collection_exists = self.arvrunner.api.collections().list(
249                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
250                                  ['portable_data_hash', '=', record["output"]],
251                                  ["name", "=", colname]]
252                     ).execute(num_retries=self.arvrunner.num_retries)
253
254                     if not collection_exists["items"]:
255                         # Create a collection located in the same project as the
256                         # pipeline with the contents of the output.
257                         # First, get output record.
258                         collections = self.arvrunner.api.collections().list(
259                             limit=1,
260                             filters=[['portable_data_hash', '=', record["output"]]],
261                             select=["manifest_text"]
262                         ).execute(num_retries=self.arvrunner.num_retries)
263
264                         if not collections["items"]:
265                             raise WorkflowException(
266                                 "Job output '%s' cannot be found on API server" % (
267                                     record["output"]))
268
269                         # Create new collection in the parent project
270                         # with the output contents.
271                         self.arvrunner.api.collections().create(body={
272                             "owner_uuid": self.arvrunner.project_uuid,
273                             "name": colname,
274                             "portable_data_hash": record["output"],
275                             "manifest_text": collections["items"][0]["manifest_text"]
276                         }, ensure_unique_name=True).execute(
277                             num_retries=self.arvrunner.num_retries)
278
279                     self.builder.outdir = outdir
280                     self.builder.pathmapper.keepdir = keepdir
281                     outputs = self.collect_outputs("keep:" + record["output"])
282             except WorkflowException as e:
283                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
284                 processStatus = "permanentFail"
285             except Exception as e:
286                 logger.exception("Got unknown exception while collecting job outputs:")
287                 processStatus = "permanentFail"
288
289             self.output_callback(outputs, processStatus)
290         finally:
291             del self.arvrunner.jobs[record["uuid"]]
292
293
294 class RunnerJob(object):
295     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
296
297     def __init__(self, runner, tool, job_order, enable_reuse):
298         self.arvrunner = runner
299         self.tool = tool
300         self.job_order = job_order
301         self.running = False
302         self.enable_reuse = enable_reuse
303
304     def update_pipeline_component(self, record):
305         pass
306
307     def upload_docker(self, tool):
308         if isinstance(tool, CommandLineTool):
309             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
310             if docker_req:
311                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
312         elif isinstance(tool, cwltool.workflow.Workflow):
313             for s in tool.steps:
314                 self.upload_docker(s.embedded_tool)
315
316     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
317         """Create an Arvados job specification for this workflow.
318
319         The returned dict can be used to create a job (i.e., passed as
320         the +body+ argument to jobs().create()), or as a component in
321         a pipeline template or pipeline instance.
322         """
323         self.upload_docker(self.tool)
324
325         workflowfiles = set()
326         jobfiles = set()
327         workflowfiles.add(self.tool.tool["id"])
328
329         self.name = os.path.basename(self.tool.tool["id"])
330
331         def visitFiles(files, path):
332             files.add(path)
333             return path
334
335         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
336         def loadref(b, u):
337             return document_loader.fetch(urlparse.urljoin(b, u))
338
339         sc = scandeps(uri, workflowobj,
340                       set(("$import", "run")),
341                       set(("$include", "$schemas", "path")),
342                       loadref)
343         adjustFiles(sc, partial(visitFiles, workflowfiles))
344         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
345
346         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
347                                        "%s",
348                                        "%s/%s",
349                                        name=self.name,
350                                        **kwargs)
351
352         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
353                                   "%s",
354                                   "%s/%s",
355                                   name=os.path.basename(self.job_order.get("id", "#")),
356                                   **kwargs)
357
358         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
359
360         if "id" in self.job_order:
361             del self.job_order["id"]
362
363         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
364         return {
365             "script": "cwl-runner",
366             "script_version": "master",
367             "repository": "arvados",
368             "script_parameters": self.job_order,
369             "runtime_constraints": {
370                 "docker_image": "arvados/jobs"
371             }
372         }
373
374     def run(self, *args, **kwargs):
375         job_spec = self.arvados_job_spec(*args, **kwargs)
376         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
377
378         response = self.arvrunner.api.jobs().create(
379             body=job_spec,
380             find_or_create=self.enable_reuse
381         ).execute(num_retries=self.arvrunner.num_retries)
382
383         self.uuid = response["uuid"]
384         self.arvrunner.jobs[self.uuid] = self
385
386         logger.info("Submitted job %s", response["uuid"])
387
388         if kwargs.get("submit"):
389             self.pipeline = self.arvrunner.api.pipeline_instances().create(
390                 body={
391                     "owner_uuid": self.arvrunner.project_uuid,
392                     "name": shortname(self.tool.tool["id"]),
393                     "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
394                     "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
395
396         if response["state"] in ("Complete", "Failed", "Cancelled"):
397             self.done(response)
398
399     def done(self, record):
400         if record["state"] == "Complete":
401             processStatus = "success"
402         else:
403             processStatus = "permanentFail"
404
405         outputs = None
406         try:
407             try:
408                 outc = arvados.collection.Collection(record["output"])
409                 with outc.open("cwl.output.json") as f:
410                     outputs = json.load(f)
411                 def keepify(path):
412                     if not path.startswith("keep:"):
413                         return "keep:%s/%s" % (record["output"], path)
414                 adjustFiles(outputs, keepify)
415             except Exception as e:
416                 logger.error("While getting final output object: %s", e)
417             self.arvrunner.output_callback(outputs, processStatus)
418         finally:
419             del self.arvrunner.jobs[record["uuid"]]
420
421
422 class RunnerTemplate(object):
423     """An Arvados pipeline template that invokes a CWL workflow."""
424
425     type_to_dataclass = {
426         'boolean': 'boolean',
427         'File': 'File',
428         'float': 'number',
429         'int': 'number',
430         'string': 'text',
431     }
432
433     def __init__(self, runner, tool, job_order, enable_reuse):
434         self.runner = runner
435         self.tool = tool
436         self.job = RunnerJob(
437             runner=runner,
438             tool=tool,
439             job_order=job_order,
440             enable_reuse=enable_reuse)
441
442     def pipeline_component_spec(self):
443         """Return a component that Workbench and a-r-p-i will understand.
444
445         Specifically, translate CWL input specs to Arvados pipeline
446         format, like {"dataclass":"File","value":"xyz"}.
447         """
448         spec = self.job.arvados_job_spec()
449
450         # Most of the component spec is exactly the same as the job
451         # spec (script, script_version, etc.).
452         # spec['script_parameters'] isn't right, though. A component
453         # spec's script_parameters hash is a translation of
454         # self.tool.tool['inputs'] with defaults/overrides taken from
455         # the job order. So we move the job parameters out of the way
456         # and build a new spec['script_parameters'].
457         job_params = spec['script_parameters']
458         spec['script_parameters'] = {}
459
460         for param in self.tool.tool['inputs']:
461             param = copy.deepcopy(param)
462
463             # Data type and "required" flag...
464             types = param['type']
465             if not isinstance(types, list):
466                 types = [types]
467             param['required'] = 'null' not in types
468             non_null_types = set(types) - set(['null'])
469             if len(non_null_types) == 1:
470                 the_type = [c for c in non_null_types][0]
471                 dataclass = self.type_to_dataclass.get(the_type)
472                 if dataclass:
473                     param['dataclass'] = dataclass
474             # Note: If we didn't figure out a single appropriate
475             # dataclass, we just left that attribute out.  We leave
476             # the "type" attribute there in any case, which might help
477             # downstream.
478
479             # Title and description...
480             title = param.pop('label', '')
481             descr = param.pop('description', '').rstrip('\n')
482             if title:
483                 param['title'] = title
484             if descr:
485                 param['description'] = descr
486
487             # Fill in the value from the current job order, if any.
488             param_id = shortname(param.pop('id'))
489             value = job_params.get(param_id)
490             if value is None:
491                 pass
492             elif not isinstance(value, dict):
493                 param['value'] = value
494             elif param.get('dataclass') == 'File' and value.get('path'):
495                 param['value'] = value['path']
496
497             spec['script_parameters'][param_id] = param
498         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
499         return spec
500
501     def save(self):
502         job_spec = self.pipeline_component_spec()
503         response = self.runner.api.pipeline_templates().create(body={
504             "components": {
505                 self.job.name: job_spec,
506             },
507             "name": self.job.name,
508             "owner_uuid": self.runner.project_uuid,
509         }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
510         self.uuid = response["uuid"]
511         logger.info("Created template %s", self.uuid)
512
513
514 class ArvPathMapper(cwltool.pathmapper.PathMapper):
515     """Convert container-local paths to and from Keep collection ids."""
516
517     def __init__(self, arvrunner, referenced_files, input_basedir,
518                  collection_pattern, file_pattern, name=None, **kwargs):
519         self._pathmap = arvrunner.get_uploaded()
520         uploadfiles = set()
521
522         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
523
524         for src in referenced_files:
525             if isinstance(src, basestring) and pdh_path.match(src):
526                 self._pathmap[src] = (src, collection_pattern % src[5:])
527             if "#" in src:
528                 src = src[:src.index("#")]
529             if src not in self._pathmap:
530                 ab = cwltool.pathmapper.abspath(src, input_basedir)
531                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
532                 if kwargs.get("conformance_test"):
533                     self._pathmap[src] = (src, ab)
534                 elif isinstance(st, arvados.commands.run.UploadFile):
535                     uploadfiles.add((src, ab, st))
536                 elif isinstance(st, arvados.commands.run.ArvFile):
537                     self._pathmap[src] = (ab, st.fn)
538                 else:
539                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
540
541         if uploadfiles:
542             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
543                                              arvrunner.api,
544                                              dry_run=kwargs.get("dry_run"),
545                                              num_retries=3,
546                                              fnPattern=file_pattern,
547                                              name=name,
548                                              project=arvrunner.project_uuid)
549
550         for src, ab, st in uploadfiles:
551             arvrunner.add_uploaded(src, (ab, st.fn))
552             self._pathmap[src] = (ab, st.fn)
553
554         self.keepdir = None
555
556     def reversemap(self, target):
557         if target.startswith("keep:"):
558             return (target, target)
559         elif self.keepdir and target.startswith(self.keepdir):
560             return (target, "keep:" + target[len(self.keepdir)+1:])
561         else:
562             return super(ArvPathMapper, self).reversemap(target)
563
564
565 class ArvadosCommandTool(CommandLineTool):
566     """Wrap cwltool CommandLineTool to override selected methods."""
567
568     def __init__(self, arvrunner, toolpath_object, crunch2, **kwargs):
569         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
570         self.arvrunner = arvrunner
571         self.crunch2 = crunch2
572
573     def makeJobRunner(self):
574         if self.crunch2:
575             return ArvadosContainer(self.arvrunner)
576         else:
577             return ArvadosJob(self.arvrunner)
578
579     def makePathMapper(self, reffiles, **kwargs):
580         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
581                              "$(task.keep)/%s",
582                              "$(task.keep)/%s/%s",
583                              **kwargs)
584
585
586 class ArvCwlRunner(object):
587     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
588     complete, and report output."""
589
590     def __init__(self, api_client, crunch2):
591         self.api = api_client
592         self.jobs = {}
593         self.lock = threading.Lock()
594         self.cond = threading.Condition(self.lock)
595         self.final_output = None
596         self.uploaded = {}
597         self.num_retries = 4
598         self.uuid = None
599         self.crunch2 = crunch2
600
601     def arvMakeTool(self, toolpath_object, **kwargs):
602         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
603             return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
604         else:
605             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
606
607     def output_callback(self, out, processStatus):
608         if processStatus == "success":
609             logger.info("Overall job status is %s", processStatus)
610             if self.pipeline:
611                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
612                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
613
614         else:
615             logger.warn("Overall job status is %s", processStatus)
616             if self.pipeline:
617                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
618                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
619         self.final_output = out
620
621     def on_message(self, event):
622         if "object_uuid" in event:
623             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
624                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
625                     uuid = event["object_uuid"]
626                     with self.lock:
627                         j = self.jobs[uuid]
628                         logger.info("Job %s (%s) is Running", j.name, uuid)
629                         j.running = True
630                         j.update_pipeline_component(event["properties"]["new_attributes"])
631                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
632                     uuid = event["object_uuid"]
633                     try:
634                         self.cond.acquire()
635                         j = self.jobs[uuid]
636                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
637                         j.done(event["properties"]["new_attributes"])
638                         self.cond.notify()
639                     finally:
640                         self.cond.release()
641
642     def get_uploaded(self):
643         return self.uploaded.copy()
644
645     def add_uploaded(self, src, pair):
646         self.uploaded[src] = pair
647
648     def arvExecutor(self, tool, job_order, **kwargs):
649         self.debug = kwargs.get("debug")
650
651         if kwargs.get("quiet"):
652             logger.setLevel(logging.WARN)
653             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
654
655         useruuid = self.api.users().current().execute()["uuid"]
656         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
657         self.pipeline = None
658
659         if kwargs.get("create_template"):
660             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
661             tmpl.save()
662             # cwltool.main will write our return value to stdout.
663             return tmpl.uuid
664
665         if kwargs.get("submit"):
666             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
667
668         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
669             # Create pipeline for local run
670             self.pipeline = self.api.pipeline_instances().create(
671                 body={
672                     "owner_uuid": self.project_uuid,
673                     "name": shortname(tool.tool["id"]),
674                     "components": {},
675                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
676             logger.info("Pipeline instance %s", self.pipeline["uuid"])
677
678         if kwargs.get("submit") and not kwargs.get("wait"):
679                 runnerjob.run()
680                 return runnerjob.uuid
681
682         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
683
684         self.debug = kwargs.get("debug")
685         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
686         self.fs_access = CollectionFsAccess(kwargs["basedir"])
687
688         kwargs["fs_access"] = self.fs_access
689         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
690
691         if self.crunch2:
692             kwargs["outdir"] = "/var/spool/cwl"
693             kwargs["tmpdir"] = "/tmp"
694         else:
695             kwargs["outdir"] = "$(task.outdir)"
696             kwargs["tmpdir"] = "$(task.tmpdir)"
697
698         if kwargs.get("conformance_test"):
699             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
700         else:
701             if kwargs.get("submit"):
702                 jobiter = iter((runnerjob,))
703             else:
704                 if "cwl_runner_job" in kwargs:
705                     self.uuid = kwargs.get("cwl_runner_job").get('uuid')
706                 jobiter = tool.job(job_order,
707                                    self.output_callback,
708                                    docker_outdir="$(task.outdir)",
709                                    **kwargs)
710
711             try:
712                 self.cond.acquire()
713                 # Will continue to hold the lock for the duration of this code
714                 # except when in cond.wait(), at which point on_message can update
715                 # job state and process output callbacks.
716
717                 for runnable in jobiter:
718                     if runnable:
719                         runnable.run(**kwargs)
720                     else:
721                         if self.jobs:
722                             self.cond.wait(1)
723                         else:
724                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
725                             break
726
727                 while self.jobs:
728                     self.cond.wait(1)
729
730                 events.close()
731             except:
732                 if sys.exc_info()[0] is KeyboardInterrupt:
733                     logger.error("Interrupted, marking pipeline as failed")
734                 else:
735                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
736                 if self.pipeline:
737                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
738                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
739             finally:
740                 self.cond.release()
741
742             if self.final_output is None:
743                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
744
745             return self.final_output
746
747 def versionstring():
748     """Print version string of key packages for provenance and debugging."""
749
750     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
751     arvpkg = pkg_resources.require("arvados-python-client")
752     cwlpkg = pkg_resources.require("cwltool")
753
754     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
755                                     "arvados-python-client", arvpkg[0].version,
756                                     "cwltool", cwlpkg[0].version)
757
758 def arg_parser():  # type: () -> argparse.ArgumentParser
759     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
760
761     parser.add_argument("--conformance-test", action="store_true")
762     parser.add_argument("--basedir", type=str,
763                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
764     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
765                         help="Output directory, default current directory")
766
767     parser.add_argument("--eval-timeout",
768                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
769                         type=float,
770                         default=20)
771     parser.add_argument("--version", action="store_true", help="Print version and exit")
772
773     exgroup = parser.add_mutually_exclusive_group()
774     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
775     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
776     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
777
778     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
779
780     exgroup = parser.add_mutually_exclusive_group()
781     exgroup.add_argument("--enable-reuse", action="store_true",
782                         default=True, dest="enable_reuse",
783                         help="")
784     exgroup.add_argument("--disable-reuse", action="store_false",
785                         default=True, dest="enable_reuse",
786                         help="")
787
788     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
789     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
790                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
791                         default=False)
792
793     exgroup = parser.add_mutually_exclusive_group()
794     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
795                         default=True, dest="submit")
796     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
797                         default=True, dest="submit")
798     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
799
800     exgroup = parser.add_mutually_exclusive_group()
801     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
802                         default=True, dest="wait")
803     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
804                         default=True, dest="wait")
805
806     exgroup = parser.add_mutually_exclusive_group()
807     exgroup.add_argument("--crunch1", action="store_false",
808                         default=False, dest="crunch2",
809                         help="Use Crunch v1 Jobs API")
810
811     exgroup.add_argument("--crunch2", action="store_true",
812                         default=False, dest="crunch2",
813                         help="Use Crunch v2 Containers API")
814
815     parser.add_argument("workflow", type=str, nargs="?", default=None)
816     parser.add_argument("job_order", nargs=argparse.REMAINDER)
817
818     return parser
819
820 def main(args, stdout, stderr, api_client=None):
821     parser = arg_parser()
822
823     job_order_object = None
824     arvargs = parser.parse_args(args)
825     if arvargs.create_template and not arvargs.job_order:
826         job_order_object = ({}, "")
827
828     try:
829         if api_client is None:
830             api_client=arvados.api('v1', model=OrderedJsonModel())
831         runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
832     except Exception as e:
833         logger.error(e)
834         return 1
835
836     return cwltool.main.main(args=arvargs,
837                              stdout=stdout,
838                              stderr=stderr,
839                              executor=runner.arvExecutor,
840                              makeTool=runner.arvMakeTool,
841                              versionfunc=versionstring,
842                              job_order_object=job_order_object)