11100: a-c-r sets output_ttl and deletes intermediate collections on success.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77         self.output_ttl = 0
78         self.intermediate_output_collections = []
79
80         if keep_client is not None:
81             self.keep_client = keep_client
82         else:
83             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
84
85         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
86
87         self.work_api = None
88         expected_api = ["jobs", "containers"]
89         for api in expected_api:
90             try:
91                 methods = self.api._rootDesc.get('resources')[api]['methods']
92                 if ('httpMethod' in methods['create'] and
93                     (work_api == api or work_api is None)):
94                     self.work_api = api
95                     break
96             except KeyError:
97                 pass
98
99         if not self.work_api:
100             if work_api is None:
101                 raise Exception("No supported APIs")
102             else:
103                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
104
105     def arv_make_tool(self, toolpath_object, **kwargs):
106         kwargs["work_api"] = self.work_api
107         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
108                                                 api_client=self.api,
109                                                 fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
110                                                 num_retries=self.num_retries)
111         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
112             return ArvadosCommandTool(self, toolpath_object, **kwargs)
113         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
114             return ArvadosWorkflow(self, toolpath_object, **kwargs)
115         else:
116             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
117
118     def output_callback(self, out, processStatus):
119         if processStatus == "success":
120             logger.info("Overall process status is %s", processStatus)
121             if self.pipeline:
122                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
124         else:
125             logger.warn("Overall process status is %s", processStatus)
126             if self.pipeline:
127                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
128                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
129         self.final_status = processStatus
130         self.final_output = out
131
132     def on_message(self, event):
133         if "object_uuid" in event:
134             if event["object_uuid"] in self.processes and event["event_type"] == "update":
135                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
136                     uuid = event["object_uuid"]
137                     with self.lock:
138                         j = self.processes[uuid]
139                         logger.info("%s %s is Running", self.label(j), uuid)
140                         j.running = True
141                         j.update_pipeline_component(event["properties"]["new_attributes"])
142                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
143                     uuid = event["object_uuid"]
144                     try:
145                         self.cond.acquire()
146                         j = self.processes[uuid]
147                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
148                         with Perf(metrics, "done %s" % j.name):
149                             j.done(event["properties"]["new_attributes"])
150                         self.cond.notify()
151                     finally:
152                         self.cond.release()
153
154     def label(self, obj):
155         return "[%s %s]" % (self.work_api[0:-1], obj.name)
156
157     def poll_states(self):
158         """Poll status of jobs or containers listed in the processes dict.
159
160         Runs in a separate thread.
161         """
162
163         try:
164             while True:
165                 self.stop_polling.wait(15)
166                 if self.stop_polling.is_set():
167                     break
168                 with self.lock:
169                     keys = self.processes.keys()
170                 if not keys:
171                     continue
172
173                 if self.work_api == "containers":
174                     table = self.poll_api.container_requests()
175                 elif self.work_api == "jobs":
176                     table = self.poll_api.jobs()
177
178                 try:
179                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
180                 except Exception as e:
181                     logger.warn("Error checking states on API server: %s", e)
182                     continue
183
184                 for p in proc_states["items"]:
185                     self.on_message({
186                         "object_uuid": p["uuid"],
187                         "event_type": "update",
188                         "properties": {
189                             "new_attributes": p
190                         }
191                     })
192         except:
193             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
194             self.cond.acquire()
195             self.processes.clear()
196             self.cond.notify()
197             self.cond.release()
198         finally:
199             self.stop_polling.set()
200
201     def get_uploaded(self):
202         return self.uploaded.copy()
203
204     def add_uploaded(self, src, pair):
205         self.uploaded[src] = pair
206
207     def add_intermediate_output(self, uuid):
208         if uuid:
209             self.intermediate_output_collections.append(uuid)
210
211     def trash_intermediate_output(self):
212         logger.info("Cleaning up intermediate output collections")
213         for i in self.intermediate_output_collections:
214             try:
215                 self.api_client.collections().delete(uuid=i).execute(num_retries=self.num_retries)
216             except:
217                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
218             if sys.exc_info()[0] is KeyboardInterrupt:
219                 break
220
221     def check_features(self, obj):
222         if isinstance(obj, dict):
223             if obj.get("writable"):
224                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
225             if obj.get("class") == "DockerRequirement":
226                 if obj.get("dockerOutputDirectory"):
227                     # TODO: can be supported by containers API, but not jobs API.
228                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
229                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
230             for v in obj.itervalues():
231                 self.check_features(v)
232         elif isinstance(obj, list):
233             for i,v in enumerate(obj):
234                 with SourceLine(obj, i, UnsupportedRequirement):
235                     self.check_features(v)
236
237     def make_output_collection(self, name, tagsString, outputObj):
238         outputObj = copy.deepcopy(outputObj)
239
240         files = []
241         def capture(fileobj):
242             files.append(fileobj)
243
244         adjustDirObjs(outputObj, capture)
245         adjustFileObjs(outputObj, capture)
246
247         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
248
249         final = arvados.collection.Collection(api_client=self.api,
250                                               keep_client=self.keep_client,
251                                               num_retries=self.num_retries)
252
253         for k,v in generatemapper.items():
254             if k.startswith("_:"):
255                 if v.type == "Directory":
256                     continue
257                 if v.type == "CreateFile":
258                     with final.open(v.target, "wb") as f:
259                         f.write(v.resolved.encode("utf-8"))
260                     continue
261
262             if not k.startswith("keep:"):
263                 raise Exception("Output source is not in keep or a literal")
264             sp = k.split("/")
265             srccollection = sp[0][5:]
266             try:
267                 reader = self.collection_cache.get(srccollection)
268                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
269                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
270             except arvados.errors.ArgumentError as e:
271                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
272                 raise
273             except IOError as e:
274                 logger.warn("While preparing output collection: %s", e)
275
276         def rewrite(fileobj):
277             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
278             for k in ("basename", "listing", "contents"):
279                 if k in fileobj:
280                     del fileobj[k]
281
282         adjustDirObjs(outputObj, rewrite)
283         adjustFileObjs(outputObj, rewrite)
284
285         with final.open("cwl.output.json", "w") as f:
286             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
287
288         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
289
290         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
291                     final.api_response()["name"],
292                     final.manifest_locator())
293
294         final_uuid = final.manifest_locator()
295         tags = tagsString.split(',')
296         for tag in tags:
297              self.api.links().create(body={
298                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
299                 }).execute(num_retries=self.num_retries)
300
301         def finalcollection(fileobj):
302             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
303
304         adjustDirObjs(outputObj, finalcollection)
305         adjustFileObjs(outputObj, finalcollection)
306
307         return (outputObj, final)
308
309     def set_crunch_output(self):
310         if self.work_api == "containers":
311             try:
312                 current = self.api.containers().current().execute(num_retries=self.num_retries)
313             except ApiError as e:
314                 # Status code 404 just means we're not running in a container.
315                 if e.resp.status != 404:
316                     logger.info("Getting current container: %s", e)
317                 return
318             try:
319                 self.api.containers().update(uuid=current['uuid'],
320                                              body={
321                                                  'output': self.final_output_collection.portable_data_hash(),
322                                              }).execute(num_retries=self.num_retries)
323                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
324                                               body={
325                                                   'is_trashed': True
326                                               }).execute(num_retries=self.num_retries)
327             except Exception as e:
328                 logger.info("Setting container output: %s", e)
329         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
330             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
331                                    body={
332                                        'output': self.final_output_collection.portable_data_hash(),
333                                        'success': self.final_status == "success",
334                                        'progress':1.0
335                                    }).execute(num_retries=self.num_retries)
336
337     def arv_executor(self, tool, job_order, **kwargs):
338         self.debug = kwargs.get("debug")
339
340         tool.visit(self.check_features)
341
342         self.project_uuid = kwargs.get("project_uuid")
343         self.pipeline = None
344         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
345                                                                  collection_cache=self.collection_cache)
346         self.fs_access = make_fs_access(kwargs["basedir"])
347
348         self.output_ttl = kwargs["intermediate_output_ttl"]
349         if self.output_ttl and self.work_api != "containers":
350             raise Exception("--intermediate-output-ttl is only supported when using the containers api.")
351
352         if not kwargs.get("name"):
353             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
354
355         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
356         # Also uploads docker images.
357         upload_workflow_deps(self, tool)
358
359         # Reload tool object which may have been updated by
360         # upload_workflow_deps
361         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
362                                   makeTool=self.arv_make_tool,
363                                   loader=tool.doc_loader,
364                                   avsc_names=tool.doc_schema,
365                                   metadata=tool.metadata)
366
367         # Upload local file references in the job order.
368         job_order = upload_job_order(self, "%s input" % kwargs["name"],
369                                      tool, job_order)
370
371         existing_uuid = kwargs.get("update_workflow")
372         if existing_uuid or kwargs.get("create_workflow"):
373             # Create a pipeline template or workflow record and exit.
374             if self.work_api == "jobs":
375                 tmpl = RunnerTemplate(self, tool, job_order,
376                                       kwargs.get("enable_reuse"),
377                                       uuid=existing_uuid,
378                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
379                                       name=kwargs["name"])
380                 tmpl.save()
381                 # cwltool.main will write our return value to stdout.
382                 return (tmpl.uuid, "success")
383             elif self.work_api == "containers":
384                 return (upload_workflow(self, tool, job_order,
385                                         self.project_uuid,
386                                         uuid=existing_uuid,
387                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
388                                         name=kwargs["name"]),
389                         "success")
390
391         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
392
393         kwargs["make_fs_access"] = make_fs_access
394         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
395         kwargs["use_container"] = True
396         kwargs["tmpdir_prefix"] = "tmp"
397         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
398
399         if self.work_api == "containers":
400             kwargs["outdir"] = "/var/spool/cwl"
401             kwargs["docker_outdir"] = "/var/spool/cwl"
402             kwargs["tmpdir"] = "/tmp"
403             kwargs["docker_tmpdir"] = "/tmp"
404         elif self.work_api == "jobs":
405             kwargs["outdir"] = "$(task.outdir)"
406             kwargs["docker_outdir"] = "$(task.outdir)"
407             kwargs["tmpdir"] = "$(task.tmpdir)"
408
409         runnerjob = None
410         if kwargs.get("submit"):
411             # Submit a runner job to run the workflow for us.
412             if self.work_api == "containers":
413                 if tool.tool["class"] == "CommandLineTool":
414                     kwargs["runnerjob"] = tool.tool["id"]
415                     upload_dependencies(self,
416                                         kwargs["name"],
417                                         tool.doc_loader,
418                                         tool.tool,
419                                         tool.tool["id"],
420                                         False)
421                     runnerjob = tool.job(job_order,
422                                          self.output_callback,
423                                          **kwargs).next()
424                 else:
425                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
426                                                 self.output_name,
427                                                 self.output_tags,
428                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
429                                                 name=kwargs.get("name"),
430                                                 on_error=kwargs.get("on_error"),
431                                                 submit_runner_image=kwargs.get("submit_runner_image"))
432             elif self.work_api == "jobs":
433                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
434                                       self.output_name,
435                                       self.output_tags,
436                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
437                                       name=kwargs.get("name"),
438                                       on_error=kwargs.get("on_error"),
439                                       submit_runner_image=kwargs.get("submit_runner_image"))
440
441         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
442             # Create pipeline for local run
443             self.pipeline = self.api.pipeline_instances().create(
444                 body={
445                     "owner_uuid": self.project_uuid,
446                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
447                     "components": {},
448                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
449             logger.info("Pipeline instance %s", self.pipeline["uuid"])
450
451         if runnerjob and not kwargs.get("wait"):
452             runnerjob.run(wait=kwargs.get("wait"))
453             return (runnerjob.uuid, "success")
454
455         self.poll_api = arvados.api('v1')
456         self.polling_thread = threading.Thread(target=self.poll_states)
457         self.polling_thread.start()
458
459         if runnerjob:
460             jobiter = iter((runnerjob,))
461         else:
462             if "cwl_runner_job" in kwargs:
463                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
464             jobiter = tool.job(job_order,
465                                self.output_callback,
466                                **kwargs)
467
468         try:
469             self.cond.acquire()
470             # Will continue to hold the lock for the duration of this code
471             # except when in cond.wait(), at which point on_message can update
472             # job state and process output callbacks.
473
474             loopperf = Perf(metrics, "jobiter")
475             loopperf.__enter__()
476             for runnable in jobiter:
477                 loopperf.__exit__()
478
479                 if self.stop_polling.is_set():
480                     break
481
482                 if runnable:
483                     with Perf(metrics, "run"):
484                         runnable.run(**kwargs)
485                 else:
486                     if self.processes:
487                         self.cond.wait(1)
488                     else:
489                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
490                         break
491                 loopperf.__enter__()
492             loopperf.__exit__()
493
494             while self.processes:
495                 self.cond.wait(1)
496
497         except UnsupportedRequirement:
498             raise
499         except:
500             if sys.exc_info()[0] is KeyboardInterrupt:
501                 logger.error("Interrupted, marking pipeline as failed")
502             else:
503                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
504             if self.pipeline:
505                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
506                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
507             if runnerjob and runnerjob.uuid and self.work_api == "containers":
508                 self.api.container_requests().update(uuid=runnerjob.uuid,
509                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
510         finally:
511             self.cond.release()
512             self.stop_polling.set()
513             self.polling_thread.join()
514
515         if self.final_status == "UnsupportedRequirement":
516             raise UnsupportedRequirement("Check log for details.")
517
518         if self.final_output is None:
519             raise WorkflowException("Workflow did not return a result.")
520
521         if kwargs.get("submit") and isinstance(runnerjob, Runner):
522             logger.info("Final output collection %s", runnerjob.final_output)
523         else:
524             if self.output_name is None:
525                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
526             if self.output_tags is None:
527                 self.output_tags = ""
528             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
529             self.set_crunch_output()
530
531         if kwargs.get("compute_checksum"):
532             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
533             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
534
535         if self.output_ttl and self.final_status == "success":
536             self.trash_intermediate_output()
537
538         return (self.final_output, self.final_status)
539
540
541 def versionstring():
542     """Print version string of key packages for provenance and debugging."""
543
544     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
545     arvpkg = pkg_resources.require("arvados-python-client")
546     cwlpkg = pkg_resources.require("cwltool")
547
548     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
549                                     "arvados-python-client", arvpkg[0].version,
550                                     "cwltool", cwlpkg[0].version)
551
552
553 def arg_parser():  # type: () -> argparse.ArgumentParser
554     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
555
556     parser.add_argument("--basedir", type=str,
557                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
558     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
559                         help="Output directory, default current directory")
560
561     parser.add_argument("--eval-timeout",
562                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
563                         type=float,
564                         default=20)
565
566     exgroup = parser.add_mutually_exclusive_group()
567     exgroup.add_argument("--print-dot", action="store_true",
568                          help="Print workflow visualization in graphviz format and exit")
569     exgroup.add_argument("--version", action="store_true", help="Print version and exit")
570     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
571
572     exgroup = parser.add_mutually_exclusive_group()
573     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
574     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
575     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
576
577     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
578
579     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
580
581     exgroup = parser.add_mutually_exclusive_group()
582     exgroup.add_argument("--enable-reuse", action="store_true",
583                         default=True, dest="enable_reuse",
584                         help="")
585     exgroup.add_argument("--disable-reuse", action="store_false",
586                         default=True, dest="enable_reuse",
587                         help="")
588
589     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
590     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
591     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
592     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
593                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
594                         default=False)
595
596     exgroup = parser.add_mutually_exclusive_group()
597     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
598                         default=True, dest="submit")
599     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
600                         default=True, dest="submit")
601     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
602                          dest="create_workflow")
603     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
604     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
605
606     exgroup = parser.add_mutually_exclusive_group()
607     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
608                         default=True, dest="wait")
609     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
610                         default=True, dest="wait")
611
612     exgroup = parser.add_mutually_exclusive_group()
613     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
614                         default=True, dest="log_timestamps")
615     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
616                         default=True, dest="log_timestamps")
617
618     parser.add_argument("--api", type=str,
619                         default=None, dest="work_api",
620                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
621
622     parser.add_argument("--compute-checksum", action="store_true", default=False,
623                         help="Compute checksum of contents while collecting outputs",
624                         dest="compute_checksum")
625
626     parser.add_argument("--submit-runner-ram", type=int,
627                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
628                         default=1024)
629
630     parser.add_argument("--submit-runner-image", type=str,
631                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
632                         default=None)
633
634     parser.add_argument("--name", type=str,
635                         help="Name to use for workflow execution instance.",
636                         default=None)
637
638     parser.add_argument("--on-error", type=str,
639                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
640                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
641
642     parser.add_argument("--enable-dev", action="store_true",
643                         help="Enable loading and running development versions "
644                              "of CWL spec.", default=False)
645     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
646                         help="If N > 0, intermediate output collections will be trashed N seconds after creation, or on successful completion of workflow (whichever comes first).",
647                         default=0)
648
649     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
650     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
651
652     return parser
653
654 def add_arv_hints():
655     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
656     cwltool.draft2tool.ACCEPTLIST_RE = cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE
657     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
658     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
659     res.close()
660     cwltool.process.supportedProcessRequirements.extend([
661         "http://arvados.org/cwl#RunInSingleContainer",
662         "http://arvados.org/cwl#OutputDirType",
663         "http://arvados.org/cwl#RuntimeConstraints",
664         "http://arvados.org/cwl#PartitionRequirement",
665         "http://arvados.org/cwl#APIRequirement",
666         "http://commonwl.org/cwltool#LoadListingRequirement"
667     ])
668
669 def main(args, stdout, stderr, api_client=None, keep_client=None):
670     parser = arg_parser()
671
672     job_order_object = None
673     arvargs = parser.parse_args(args)
674
675     if arvargs.version:
676         print versionstring()
677         return
678
679     if arvargs.update_workflow:
680         if arvargs.update_workflow.find('-7fd4e-') == 5:
681             want_api = 'containers'
682         elif arvargs.update_workflow.find('-p5p6p-') == 5:
683             want_api = 'jobs'
684         else:
685             want_api = None
686         if want_api and arvargs.work_api and want_api != arvargs.work_api:
687             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
688                 arvargs.update_workflow, want_api, arvargs.work_api))
689             return 1
690         arvargs.work_api = want_api
691
692     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
693         job_order_object = ({}, "")
694
695     add_arv_hints()
696
697     try:
698         if api_client is None:
699             api_client=arvados.api('v1', model=OrderedJsonModel())
700         if keep_client is None:
701             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
702         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
703                               num_retries=4, output_name=arvargs.output_name,
704                               output_tags=arvargs.output_tags)
705     except Exception as e:
706         logger.error(e)
707         return 1
708
709     if arvargs.debug:
710         logger.setLevel(logging.DEBUG)
711         logging.getLogger('arvados').setLevel(logging.DEBUG)
712
713     if arvargs.quiet:
714         logger.setLevel(logging.WARN)
715         logging.getLogger('arvados').setLevel(logging.WARN)
716         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
717
718     if arvargs.metrics:
719         metrics.setLevel(logging.DEBUG)
720         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
721
722     if arvargs.log_timestamps:
723         arvados.log_handler.setFormatter(logging.Formatter(
724             '%(asctime)s %(name)s %(levelname)s: %(message)s',
725             '%Y-%m-%d %H:%M:%S'))
726     else:
727         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
728
729     arvargs.conformance_test = None
730     arvargs.use_container = True
731     arvargs.relax_path_checks = True
732     arvargs.validate = None
733
734     make_fs_access = partial(CollectionFsAccess,
735                            collection_cache=runner.collection_cache)
736
737     return cwltool.main.main(args=arvargs,
738                              stdout=stdout,
739                              stderr=stderr,
740                              executor=runner.arv_executor,
741                              makeTool=runner.arv_make_tool,
742                              versionfunc=versionstring,
743                              job_order_object=job_order_object,
744                              make_fs_access=make_fs_access,
745                              fetcher_constructor=partial(CollectionFetcher,
746                                                          api_client=api_client,
747                                                          fs_access=make_fs_access(""),
748                                                          num_retries=runner.num_retries),
749                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
750                              logger_handler=arvados.log_handler,
751                              custom_schema_callback=add_arv_hints)