8442: CWL create crunch2 containers WIP
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.events
11 import arvados.util
12 import copy
13 import cwltool.docker
14 from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import fnmatch
19 from functools import partial
20 import json
21 import logging
22 import os
23 import pkg_resources  # part of setuptools
24 import re
25 import sys
26 import threading
27 from cwltool.load_tool import fetch_document
28 from cwltool.builder import Builder
29 import urlparse
30
31 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
32 from arvados.api import OrderedJsonModel
33
34 logger = logging.getLogger('arvados.cwl-runner')
35 logger.setLevel(logging.INFO)
36
37 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
38 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
39 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
40
41
42 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
43     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
44
45     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
46         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
47
48     sp = dockerRequirement["dockerImageId"].split(":")
49     image_name = sp[0]
50     image_tag = sp[1] if len(sp) > 1 else None
51
52     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
53                                                             image_name=image_name,
54                                                             image_tag=image_tag)
55
56     if not images:
57         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
58         args = ["--project-uuid="+project_uuid, image_name]
59         if image_tag:
60             args.append(image_tag)
61         logger.info("Uploading Docker image %s", ":".join(args[1:]))
62         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
63
64     # XXX return PDH instead
65
66     return dockerRequirement["dockerImageId"]
67
68
69 class CollectionFsAccess(cwltool.process.StdFsAccess):
70     """Implement the cwltool FsAccess interface for Arvados Collections."""
71
72     def __init__(self, basedir):
73         super(CollectionFsAccess, self).__init__(basedir)
74         self.collections = {}
75
76     def get_collection(self, path):
77         p = path.split("/")
78         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
79             pdh = p[0][5:]
80             if pdh not in self.collections:
81                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
82             return (self.collections[pdh], "/".join(p[1:]))
83         else:
84             return (None, path)
85
86     def _match(self, collection, patternsegments, parent):
87         if not patternsegments:
88             return []
89
90         if not isinstance(collection, arvados.collection.RichCollectionBase):
91             return []
92
93         ret = []
94         # iterate over the files and subcollections in 'collection'
95         for filename in collection:
96             if patternsegments[0] == '.':
97                 # Pattern contains something like "./foo" so just shift
98                 # past the "./"
99                 ret.extend(self._match(collection, patternsegments[1:], parent))
100             elif fnmatch.fnmatch(filename, patternsegments[0]):
101                 cur = os.path.join(parent, filename)
102                 if len(patternsegments) == 1:
103                     ret.append(cur)
104                 else:
105                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
106         return ret
107
108     def glob(self, pattern):
109         collection, rest = self.get_collection(pattern)
110         patternsegments = rest.split("/")
111         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
112
113     def open(self, fn, mode):
114         collection, rest = self.get_collection(fn)
115         if collection:
116             return collection.open(rest, mode)
117         else:
118             return open(self._abs(fn), mode)
119
120     def exists(self, fn):
121         collection, rest = self.get_collection(fn)
122         if collection:
123             return collection.exists(rest)
124         else:
125             return os.path.exists(self._abs(fn))
126
127 class ArvadosJob(object):
128     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
129
130     def __init__(self, runner):
131         self.arvrunner = runner
132         self.running = False
133
134     def run(self, dry_run=False, pull_image=True, **kwargs):
135         script_parameters = {
136             "command": self.command_line
137         }
138         runtime_constraints = {}
139
140         if self.generatefiles:
141             vwd = arvados.collection.Collection()
142             script_parameters["task.vwd"] = {}
143             for t in self.generatefiles:
144                 if isinstance(self.generatefiles[t], dict):
145                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
146                     vwd.copy(rest, t, source_collection=src)
147                 else:
148                     with vwd.open(t, "w") as f:
149                         f.write(self.generatefiles[t])
150             vwd.save_new()
151             for t in self.generatefiles:
152                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
153
154         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
155         if self.environment:
156             script_parameters["task.env"].update(self.environment)
157
158         if self.stdin:
159             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
160
161         if self.stdout:
162             script_parameters["task.stdout"] = self.stdout
163
164         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
165         if docker_req and kwargs.get("use_container") is not False:
166             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
167         else:
168             runtime_constraints["docker_image"] = "arvados/jobs"
169
170         resources = self.builder.resources
171         if resources is not None:
172             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
173             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
174             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
175
176         filters = [["repository", "=", "arvados"],
177                    ["script", "=", "crunchrunner"],
178                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
179         if not self.arvrunner.ignore_docker_for_reuse:
180             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
181
182         try:
183             response = self.arvrunner.api.jobs().create(
184                 body={
185                     "owner_uuid": self.arvrunner.project_uuid,
186                     "script": "crunchrunner",
187                     "repository": "arvados",
188                     "script_version": "master",
189                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
190                     "script_parameters": {"tasks": [script_parameters]},
191                     "runtime_constraints": runtime_constraints
192                 },
193                 filters=filters,
194                 find_or_create=kwargs.get("enable_reuse", True)
195             ).execute(num_retries=self.arvrunner.num_retries)
196
197             self.arvrunner.jobs[response["uuid"]] = self
198
199             self.update_pipeline_component(response)
200
201             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
202
203             if response["state"] in ("Complete", "Failed", "Cancelled"):
204                 self.done(response)
205         except Exception as e:
206             logger.error("Got error %s" % str(e))
207             self.output_callback({}, "permanentFail")
208
209     def update_pipeline_component(self, record):
210         if self.arvrunner.pipeline:
211             self.arvrunner.pipeline["components"][self.name] = {"job": record}
212             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
213                                                                                  body={
214                                                                                     "components": self.arvrunner.pipeline["components"]
215                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
216         if self.arvrunner.uuid:
217             try:
218                 job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
219                 if job:
220                     components = job["components"]
221                     components[self.name] = record["uuid"]
222                     self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
223                         body={
224                             "components": components
225                         }).execute(num_retries=self.arvrunner.num_retries)
226             except Exception as e:
227                 logger.info("Error adding to components: %s", e)
228
229     def done(self, record):
230         try:
231             self.update_pipeline_component(record)
232         except:
233             pass
234
235         try:
236             if record["state"] == "Complete":
237                 processStatus = "success"
238             else:
239                 processStatus = "permanentFail"
240
241             try:
242                 outputs = {}
243                 if record["output"]:
244                     logc = arvados.collection.Collection(record["log"])
245                     log = logc.open(logc.keys()[0])
246                     tmpdir = None
247                     outdir = None
248                     keepdir = None
249                     for l in log:
250                         # Determine the tmpdir, outdir and keepdir paths from
251                         # the job run.  Unfortunately, we can't take the first
252                         # values we find (which are expected to be near the
253                         # top) and stop scanning because if the node fails and
254                         # the job restarts on a different node these values
255                         # will different runs, and we need to know about the
256                         # final run that actually produced output.
257
258                         g = tmpdirre.match(l)
259                         if g:
260                             tmpdir = g.group(1)
261                         g = outdirre.match(l)
262                         if g:
263                             outdir = g.group(1)
264                         g = keepre.match(l)
265                         if g:
266                             keepdir = g.group(1)
267
268                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
269
270                     # check if collection already exists with same owner, name and content
271                     collection_exists = self.arvrunner.api.collections().list(
272                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
273                                  ['portable_data_hash', '=', record["output"]],
274                                  ["name", "=", colname]]
275                     ).execute(num_retries=self.arvrunner.num_retries)
276
277                     if not collection_exists["items"]:
278                         # Create a collection located in the same project as the
279                         # pipeline with the contents of the output.
280                         # First, get output record.
281                         collections = self.arvrunner.api.collections().list(
282                             limit=1,
283                             filters=[['portable_data_hash', '=', record["output"]]],
284                             select=["manifest_text"]
285                         ).execute(num_retries=self.arvrunner.num_retries)
286
287                         if not collections["items"]:
288                             raise WorkflowException(
289                                 "Job output '%s' cannot be found on API server" % (
290                                     record["output"]))
291
292                         # Create new collection in the parent project
293                         # with the output contents.
294                         self.arvrunner.api.collections().create(body={
295                             "owner_uuid": self.arvrunner.project_uuid,
296                             "name": colname,
297                             "portable_data_hash": record["output"],
298                             "manifest_text": collections["items"][0]["manifest_text"]
299                         }, ensure_unique_name=True).execute(
300                             num_retries=self.arvrunner.num_retries)
301
302                     self.builder.outdir = outdir
303                     self.builder.pathmapper.keepdir = keepdir
304                     outputs = self.collect_outputs("keep:" + record["output"])
305             except WorkflowException as e:
306                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
307                 processStatus = "permanentFail"
308             except Exception as e:
309                 logger.exception("Got unknown exception while collecting job outputs:")
310                 processStatus = "permanentFail"
311
312             self.output_callback(outputs, processStatus)
313         finally:
314             del self.arvrunner.jobs[record["uuid"]]
315
316
317 class RunnerJob(object):
318     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
319
320     def __init__(self, runner, tool, job_order, enable_reuse):
321         self.arvrunner = runner
322         self.tool = tool
323         self.job_order = job_order
324         self.running = False
325         self.enable_reuse = enable_reuse
326
327     def update_pipeline_component(self, record):
328         pass
329
330     def upload_docker(self, tool):
331         if isinstance(tool, CommandLineTool):
332             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
333             if docker_req:
334                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
335         elif isinstance(tool, cwltool.workflow.Workflow):
336             for s in tool.steps:
337                 self.upload_docker(s.embedded_tool)
338
339     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
340         """Create an Arvados job specification for this workflow.
341
342         The returned dict can be used to create a job (i.e., passed as
343         the +body+ argument to jobs().create()), or as a component in
344         a pipeline template or pipeline instance.
345         """
346         self.upload_docker(self.tool)
347
348         workflowfiles = set()
349         jobfiles = set()
350         workflowfiles.add(self.tool.tool["id"])
351
352         self.name = os.path.basename(self.tool.tool["id"])
353
354         def visitFiles(files, path):
355             files.add(path)
356             return path
357
358         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
359         def loadref(b, u):
360             return document_loader.fetch(urlparse.urljoin(b, u))
361
362         sc = scandeps(uri, workflowobj,
363                       set(("$import", "run")),
364                       set(("$include", "$schemas", "path")),
365                       loadref)
366         adjustFiles(sc, partial(visitFiles, workflowfiles))
367         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
368
369         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
370                                        "%s",
371                                        "%s/%s",
372                                        name=self.name,
373                                        **kwargs)
374
375         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
376                                   "%s",
377                                   "%s/%s",
378                                   name=os.path.basename(self.job_order.get("id", "#")),
379                                   **kwargs)
380
381         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
382
383         if "id" in self.job_order:
384             del self.job_order["id"]
385
386         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
387         return {
388             "script": "cwl-runner",
389             "script_version": "master",
390             "repository": "arvados",
391             "script_parameters": self.job_order,
392             "runtime_constraints": {
393                 "docker_image": "arvados/jobs"
394             }
395         }
396
397     def run(self, *args, **kwargs):
398         job_spec = self.arvados_job_spec(*args, **kwargs)
399         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
400
401         response = self.arvrunner.api.jobs().create(
402             body=job_spec,
403             find_or_create=self.enable_reuse
404         ).execute(num_retries=self.arvrunner.num_retries)
405
406         self.uuid = response["uuid"]
407         self.arvrunner.jobs[self.uuid] = self
408
409         logger.info("Submitted job %s", response["uuid"])
410
411         if kwargs.get("submit"):
412             self.pipeline = self.arvrunner.api.pipeline_instances().create(
413                 body={
414                     "owner_uuid": self.arvrunner.project_uuid,
415                     "name": shortname(self.tool.tool["id"]),
416                     "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
417                     "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
418
419         if response["state"] in ("Complete", "Failed", "Cancelled"):
420             self.done(response)
421
422     def done(self, record):
423         if record["state"] == "Complete":
424             processStatus = "success"
425         else:
426             processStatus = "permanentFail"
427
428         outputs = None
429         try:
430             try:
431                 outc = arvados.collection.Collection(record["output"])
432                 with outc.open("cwl.output.json") as f:
433                     outputs = json.load(f)
434                 def keepify(path):
435                     if not path.startswith("keep:"):
436                         return "keep:%s/%s" % (record["output"], path)
437                 adjustFiles(outputs, keepify)
438             except Exception as e:
439                 logger.error("While getting final output object: %s", e)
440             self.arvrunner.output_callback(outputs, processStatus)
441         finally:
442             del self.arvrunner.jobs[record["uuid"]]
443
444
445 class RunnerTemplate(object):
446     """An Arvados pipeline template that invokes a CWL workflow."""
447
448     type_to_dataclass = {
449         'boolean': 'boolean',
450         'File': 'File',
451         'float': 'number',
452         'int': 'number',
453         'string': 'text',
454     }
455
456     def __init__(self, runner, tool, job_order, enable_reuse):
457         self.runner = runner
458         self.tool = tool
459         self.job = RunnerJob(
460             runner=runner,
461             tool=tool,
462             job_order=job_order,
463             enable_reuse=enable_reuse)
464
465     def pipeline_component_spec(self):
466         """Return a component that Workbench and a-r-p-i will understand.
467
468         Specifically, translate CWL input specs to Arvados pipeline
469         format, like {"dataclass":"File","value":"xyz"}.
470         """
471         spec = self.job.arvados_job_spec()
472
473         # Most of the component spec is exactly the same as the job
474         # spec (script, script_version, etc.).
475         # spec['script_parameters'] isn't right, though. A component
476         # spec's script_parameters hash is a translation of
477         # self.tool.tool['inputs'] with defaults/overrides taken from
478         # the job order. So we move the job parameters out of the way
479         # and build a new spec['script_parameters'].
480         job_params = spec['script_parameters']
481         spec['script_parameters'] = {}
482
483         for param in self.tool.tool['inputs']:
484             param = copy.deepcopy(param)
485
486             # Data type and "required" flag...
487             types = param['type']
488             if not isinstance(types, list):
489                 types = [types]
490             param['required'] = 'null' not in types
491             non_null_types = set(types) - set(['null'])
492             if len(non_null_types) == 1:
493                 the_type = [c for c in non_null_types][0]
494                 dataclass = self.type_to_dataclass.get(the_type)
495                 if dataclass:
496                     param['dataclass'] = dataclass
497             # Note: If we didn't figure out a single appropriate
498             # dataclass, we just left that attribute out.  We leave
499             # the "type" attribute there in any case, which might help
500             # downstream.
501
502             # Title and description...
503             title = param.pop('label', '')
504             descr = param.pop('description', '').rstrip('\n')
505             if title:
506                 param['title'] = title
507             if descr:
508                 param['description'] = descr
509
510             # Fill in the value from the current job order, if any.
511             param_id = shortname(param.pop('id'))
512             value = job_params.get(param_id)
513             if value is None:
514                 pass
515             elif not isinstance(value, dict):
516                 param['value'] = value
517             elif param.get('dataclass') == 'File' and value.get('path'):
518                 param['value'] = value['path']
519
520             spec['script_parameters'][param_id] = param
521         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
522         return spec
523
524     def save(self):
525         job_spec = self.pipeline_component_spec()
526         response = self.runner.api.pipeline_templates().create(body={
527             "components": {
528                 self.job.name: job_spec,
529             },
530             "name": self.job.name,
531             "owner_uuid": self.runner.project_uuid,
532         }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
533         self.uuid = response["uuid"]
534         logger.info("Created template %s", self.uuid)
535
536
537 class ArvPathMapper(cwltool.pathmapper.PathMapper):
538     """Convert container-local paths to and from Keep collection ids."""
539
540     def __init__(self, arvrunner, referenced_files, input_basedir,
541                  collection_pattern, file_pattern, name=None, **kwargs):
542         self._pathmap = arvrunner.get_uploaded()
543         uploadfiles = set()
544
545         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
546
547         for src in referenced_files:
548             if isinstance(src, basestring) and pdh_path.match(src):
549                 self._pathmap[src] = (src, collection_pattern % src[5:])
550             if "#" in src:
551                 src = src[:src.index("#")]
552             if src not in self._pathmap:
553                 ab = cwltool.pathmapper.abspath(src, input_basedir)
554                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
555                 if kwargs.get("conformance_test"):
556                     self._pathmap[src] = (src, ab)
557                 elif isinstance(st, arvados.commands.run.UploadFile):
558                     uploadfiles.add((src, ab, st))
559                 elif isinstance(st, arvados.commands.run.ArvFile):
560                     self._pathmap[src] = (ab, st.fn)
561                 else:
562                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
563
564         if uploadfiles:
565             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
566                                              arvrunner.api,
567                                              dry_run=kwargs.get("dry_run"),
568                                              num_retries=3,
569                                              fnPattern=file_pattern,
570                                              name=name,
571                                              project=arvrunner.project_uuid)
572
573         for src, ab, st in uploadfiles:
574             arvrunner.add_uploaded(src, (ab, st.fn))
575             self._pathmap[src] = (ab, st.fn)
576
577         self.keepdir = None
578
579     def reversemap(self, target):
580         if target.startswith("keep:"):
581             return (target, target)
582         elif self.keepdir and target.startswith(self.keepdir):
583             return (target, "keep:" + target[len(self.keepdir)+1:])
584         else:
585             return super(ArvPathMapper, self).reversemap(target)
586
587
588 class ArvadosCommandTool(CommandLineTool):
589     """Wrap cwltool CommandLineTool to override selected methods."""
590
591     def __init__(self, arvrunner, toolpath_object, **kwargs):
592         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
593         self.arvrunner = arvrunner
594
595     def makeJobRunner(self):
596         if kwargs.get("crunch2"):
597             return ArvadosContainer(self.arvrunner)
598         else:
599             return ArvadosJob(self.arvrunner)
600
601     def makePathMapper(self, reffiles, **kwargs):
602         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
603                              "$(task.keep)/%s",
604                              "$(task.keep)/%s/%s",
605                              **kwargs)
606
607
608 class ArvCwlRunner(object):
609     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
610     complete, and report output."""
611
612     def __init__(self, api_client):
613         self.api = api_client
614         self.jobs = {}
615         self.lock = threading.Lock()
616         self.cond = threading.Condition(self.lock)
617         self.final_output = None
618         self.uploaded = {}
619         self.num_retries = 4
620         self.uuid = None
621
622     def arvMakeTool(self, toolpath_object, **kwargs):
623         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
624             return ArvadosCommandTool(self, toolpath_object, **kwargs)
625         else:
626             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
627
628     def output_callback(self, out, processStatus):
629         if processStatus == "success":
630             logger.info("Overall job status is %s", processStatus)
631             if self.pipeline:
632                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
633                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
634
635         else:
636             logger.warn("Overall job status is %s", processStatus)
637             if self.pipeline:
638                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
639                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
640         self.final_output = out
641
642     def on_message(self, event):
643         if "object_uuid" in event:
644             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
645                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
646                     uuid = event["object_uuid"]
647                     with self.lock:
648                         j = self.jobs[uuid]
649                         logger.info("Job %s (%s) is Running", j.name, uuid)
650                         j.running = True
651                         j.update_pipeline_component(event["properties"]["new_attributes"])
652                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
653                     uuid = event["object_uuid"]
654                     try:
655                         self.cond.acquire()
656                         j = self.jobs[uuid]
657                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
658                         j.done(event["properties"]["new_attributes"])
659                         self.cond.notify()
660                     finally:
661                         self.cond.release()
662
663     def get_uploaded(self):
664         return self.uploaded.copy()
665
666     def add_uploaded(self, src, pair):
667         self.uploaded[src] = pair
668
669     def arvExecutor(self, tool, job_order, **kwargs):
670         self.debug = kwargs.get("debug")
671
672         if kwargs.get("quiet"):
673             logger.setLevel(logging.WARN)
674             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
675
676         useruuid = self.api.users().current().execute()["uuid"]
677         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
678         self.pipeline = None
679
680         if kwargs.get("create_template"):
681             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
682             tmpl.save()
683             # cwltool.main will write our return value to stdout.
684             return tmpl.uuid
685
686         if kwargs.get("submit"):
687             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
688
689         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
690             # Create pipeline for local run
691             self.pipeline = self.api.pipeline_instances().create(
692                 body={
693                     "owner_uuid": self.project_uuid,
694                     "name": shortname(tool.tool["id"]),
695                     "components": {},
696                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
697             logger.info("Pipeline instance %s", self.pipeline["uuid"])
698
699         if kwargs.get("submit") and not kwargs.get("wait"):
700                 runnerjob.run()
701                 return runnerjob.uuid
702
703         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
704
705         self.debug = kwargs.get("debug")
706         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
707         self.fs_access = CollectionFsAccess(kwargs["basedir"])
708
709         kwargs["fs_access"] = self.fs_access
710         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
711
712         if kwargs.get("crunch2"):
713             kwargs["outdir"] = "/var/spool/cwl"
714             kwargs["tmpdir"] = "/tmp"
715         else:
716             kwargs["outdir"] = "$(task.outdir)"
717             kwargs["tmpdir"] = "$(task.tmpdir)"
718
719         if kwargs.get("conformance_test"):
720             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
721         else:
722             if kwargs.get("submit"):
723                 jobiter = iter((runnerjob,))
724             else:
725                 if "cwl_runner_job" in kwargs:
726                     self.uuid = kwargs.get("cwl_runner_job").get('uuid')
727                 jobiter = tool.job(job_order,
728                                    self.output_callback,
729                                    docker_outdir="$(task.outdir)",
730                                    **kwargs)
731
732             try:
733                 self.cond.acquire()
734                 # Will continue to hold the lock for the duration of this code
735                 # except when in cond.wait(), at which point on_message can update
736                 # job state and process output callbacks.
737
738                 for runnable in jobiter:
739                     if runnable:
740                         runnable.run(**kwargs)
741                     else:
742                         if self.jobs:
743                             self.cond.wait(1)
744                         else:
745                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
746                             break
747
748                 while self.jobs:
749                     self.cond.wait(1)
750
751                 events.close()
752             except:
753                 if sys.exc_info()[0] is KeyboardInterrupt:
754                     logger.error("Interrupted, marking pipeline as failed")
755                 else:
756                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
757                 if self.pipeline:
758                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
759                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
760             finally:
761                 self.cond.release()
762
763             if self.final_output is None:
764                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
765
766             return self.final_output
767
768 def versionstring():
769     """Print version string of key packages for provenance and debugging."""
770
771     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
772     arvpkg = pkg_resources.require("arvados-python-client")
773     cwlpkg = pkg_resources.require("cwltool")
774
775     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
776                                     "arvados-python-client", arvpkg[0].version,
777                                     "cwltool", cwlpkg[0].version)
778
779 def arg_parser():  # type: () -> argparse.ArgumentParser
780     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
781
782     parser.add_argument("--conformance-test", action="store_true")
783     parser.add_argument("--basedir", type=str,
784                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
785     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
786                         help="Output directory, default current directory")
787
788     parser.add_argument("--eval-timeout",
789                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
790                         type=float,
791                         default=20)
792     parser.add_argument("--version", action="store_true", help="Print version and exit")
793
794     exgroup = parser.add_mutually_exclusive_group()
795     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
796     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
797     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
798
799     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
800
801     exgroup = parser.add_mutually_exclusive_group()
802     exgroup.add_argument("--enable-reuse", action="store_true",
803                         default=True, dest="enable_reuse",
804                         help="")
805     exgroup.add_argument("--disable-reuse", action="store_false",
806                         default=True, dest="enable_reuse",
807                         help="")
808
809     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
810     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
811                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
812                         default=False)
813
814     exgroup = parser.add_mutually_exclusive_group()
815     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
816                         default=True, dest="submit")
817     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
818                         default=True, dest="submit")
819     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
820
821     exgroup = parser.add_mutually_exclusive_group()
822     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
823                         default=True, dest="wait")
824     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
825                         default=True, dest="wait")
826
827     parser.add_argument("workflow", type=str, nargs="?", default=None)
828     parser.add_argument("job_order", nargs=argparse.REMAINDER)
829
830     return parser
831
832 def main(args, stdout, stderr, api_client=None):
833     parser = arg_parser()
834
835     job_order_object = None
836     arvargs = parser.parse_args(args)
837     if arvargs.create_template and not arvargs.job_order:
838         job_order_object = ({}, "")
839
840     try:
841         if api_client is None:
842             api_client=arvados.api('v1', model=OrderedJsonModel())
843         runner = ArvCwlRunner(api_client)
844     except Exception as e:
845         logger.error(e)
846         return 1
847
848     return cwltool.main.main(args=arvargs,
849                              stdout=stdout,
850                              stderr=stderr,
851                              executor=runner.arvExecutor,
852                              makeTool=runner.arvMakeTool,
853                              versionfunc=versionstring,
854                              job_order_object=job_order_object)