Merge branch '8567-moar-docker' refs #8567
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 import re
15 from functools import partial
16 import pkg_resources  # part of setuptools
17
18 from cwltool.errors import WorkflowException
19 import cwltool.main
20 import cwltool.workflow
21 import cwltool.process
22 import schema_salad
23 from schema_salad.sourceline import SourceLine
24
25 import arvados
26 import arvados.config
27 from arvados.keep import KeepClient
28 from arvados.errors import ApiError
29
30 from .arvcontainer import ArvadosContainer, RunnerContainer
31 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
32 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
33 from .arvtool import ArvadosCommandTool
34 from .arvworkflow import ArvadosWorkflow, upload_workflow
35 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
36 from .perf import Perf
37 from .pathmapper import NoFollowPathMapper
38 from ._version import __version__
39
40 from cwltool.pack import pack
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
43 from cwltool.draft2tool import compute_checksums
44 from arvados.api import OrderedJsonModel
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48 logger.setLevel(logging.INFO)
49
50 arvados.log_handler.setFormatter(logging.Formatter(
51         '%(asctime)s %(name)s %(levelname)s: %(message)s',
52         '%Y-%m-%d %H:%M:%S'))
53
54 class ArvCwlRunner(object):
55     """Execute a CWL tool or workflow, submit work (using either jobs or
56     containers API), wait for them to complete, and report output.
57
58     """
59
60     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
61         self.api = api_client
62         self.processes = {}
63         self.lock = threading.Lock()
64         self.cond = threading.Condition(self.lock)
65         self.final_output = None
66         self.final_status = None
67         self.uploaded = {}
68         self.num_retries = num_retries
69         self.uuid = None
70         self.stop_polling = threading.Event()
71         self.poll_api = None
72         self.pipeline = None
73         self.final_output_collection = None
74         self.output_name = output_name
75         self.output_tags = output_tags
76         self.project_uuid = None
77
78         if keep_client is not None:
79             self.keep_client = keep_client
80         else:
81             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
82
83         self.work_api = None
84         expected_api = ["jobs", "containers"]
85         for api in expected_api:
86             try:
87                 methods = self.api._rootDesc.get('resources')[api]['methods']
88                 if ('httpMethod' in methods['create'] and
89                     (work_api == api or work_api is None)):
90                     self.work_api = api
91                     break
92             except KeyError:
93                 pass
94
95         if not self.work_api:
96             if work_api is None:
97                 raise Exception("No supported APIs")
98             else:
99                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
100
101     def arv_make_tool(self, toolpath_object, **kwargs):
102         kwargs["work_api"] = self.work_api
103         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
104                                                 api_client=self.api,
105                                                 keep_client=self.keep_client)
106         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
107             return ArvadosCommandTool(self, toolpath_object, **kwargs)
108         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
109             return ArvadosWorkflow(self, toolpath_object, **kwargs)
110         else:
111             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
112
113     def output_callback(self, out, processStatus):
114         if processStatus == "success":
115             logger.info("Overall process status is %s", processStatus)
116             if self.pipeline:
117                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
118                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
119         else:
120             logger.warn("Overall process status is %s", processStatus)
121             if self.pipeline:
122                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
123                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
124         self.final_status = processStatus
125         self.final_output = out
126
127     def on_message(self, event):
128         if "object_uuid" in event:
129             if event["object_uuid"] in self.processes and event["event_type"] == "update":
130                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
131                     uuid = event["object_uuid"]
132                     with self.lock:
133                         j = self.processes[uuid]
134                         logger.info("%s %s is Running", self.label(j), uuid)
135                         j.running = True
136                         j.update_pipeline_component(event["properties"]["new_attributes"])
137                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
138                     uuid = event["object_uuid"]
139                     try:
140                         self.cond.acquire()
141                         j = self.processes[uuid]
142                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
143                         with Perf(metrics, "done %s" % j.name):
144                             j.done(event["properties"]["new_attributes"])
145                         self.cond.notify()
146                     finally:
147                         self.cond.release()
148
149     def label(self, obj):
150         return "[%s %s]" % (self.work_api[0:-1], obj.name)
151
152     def poll_states(self):
153         """Poll status of jobs or containers listed in the processes dict.
154
155         Runs in a separate thread.
156         """
157
158         try:
159             while True:
160                 self.stop_polling.wait(15)
161                 if self.stop_polling.is_set():
162                     break
163                 with self.lock:
164                     keys = self.processes.keys()
165                 if not keys:
166                     continue
167
168                 if self.work_api == "containers":
169                     table = self.poll_api.container_requests()
170                 elif self.work_api == "jobs":
171                     table = self.poll_api.jobs()
172
173                 try:
174                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
175                 except Exception as e:
176                     logger.warn("Error checking states on API server: %s", e)
177                     continue
178
179                 for p in proc_states["items"]:
180                     self.on_message({
181                         "object_uuid": p["uuid"],
182                         "event_type": "update",
183                         "properties": {
184                             "new_attributes": p
185                         }
186                     })
187         except:
188             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
189             self.cond.acquire()
190             self.processes.clear()
191             self.cond.notify()
192             self.cond.release()
193         finally:
194             self.stop_polling.set()
195
196     def get_uploaded(self):
197         return self.uploaded.copy()
198
199     def add_uploaded(self, src, pair):
200         self.uploaded[src] = pair
201
202     def check_features(self, obj):
203         if isinstance(obj, dict):
204             if obj.get("writable"):
205                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
206             if obj.get("class") == "DockerRequirement":
207                 if obj.get("dockerOutputDirectory"):
208                     # TODO: can be supported by containers API, but not jobs API.
209                     raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
210                         "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
211             for v in obj.itervalues():
212                 self.check_features(v)
213         elif isinstance(obj, list):
214             for i,v in enumerate(obj):
215                 with SourceLine(obj, i, UnsupportedRequirement):
216                     self.check_features(v)
217
218     def make_output_collection(self, name, tagsString, outputObj):
219         outputObj = copy.deepcopy(outputObj)
220
221         files = []
222         def capture(fileobj):
223             files.append(fileobj)
224
225         adjustDirObjs(outputObj, capture)
226         adjustFileObjs(outputObj, capture)
227
228         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
229
230         final = arvados.collection.Collection(api_client=self.api,
231                                               keep_client=self.keep_client,
232                                               num_retries=self.num_retries)
233
234         srccollections = {}
235         for k,v in generatemapper.items():
236             if k.startswith("_:"):
237                 if v.type == "Directory":
238                     continue
239                 if v.type == "CreateFile":
240                     with final.open(v.target, "wb") as f:
241                         f.write(v.resolved.encode("utf-8"))
242                     continue
243
244             if not k.startswith("keep:"):
245                 raise Exception("Output source is not in keep or a literal")
246             sp = k.split("/")
247             srccollection = sp[0][5:]
248             if srccollection not in srccollections:
249                 try:
250                     srccollections[srccollection] = arvados.collection.CollectionReader(
251                         srccollection,
252                         api_client=self.api,
253                         keep_client=self.keep_client,
254                         num_retries=self.num_retries)
255                 except arvados.errors.ArgumentError as e:
256                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
257                     raise
258             reader = srccollections[srccollection]
259             try:
260                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
261                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
262             except IOError as e:
263                 logger.warn("While preparing output collection: %s", e)
264
265         def rewrite(fileobj):
266             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
267             for k in ("basename", "listing", "contents"):
268                 if k in fileobj:
269                     del fileobj[k]
270
271         adjustDirObjs(outputObj, rewrite)
272         adjustFileObjs(outputObj, rewrite)
273
274         with final.open("cwl.output.json", "w") as f:
275             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
276
277         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
278
279         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
280                     final.api_response()["name"],
281                     final.manifest_locator())
282
283         final_uuid = final.manifest_locator()
284         tags = tagsString.split(',')
285         for tag in tags:
286              self.api.links().create(body={
287                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
288                 }).execute(num_retries=self.num_retries)
289
290         def finalcollection(fileobj):
291             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
292
293         adjustDirObjs(outputObj, finalcollection)
294         adjustFileObjs(outputObj, finalcollection)
295
296         return (outputObj, final)
297
298     def set_crunch_output(self):
299         if self.work_api == "containers":
300             try:
301                 current = self.api.containers().current().execute(num_retries=self.num_retries)
302             except ApiError as e:
303                 # Status code 404 just means we're not running in a container.
304                 if e.resp.status != 404:
305                     logger.info("Getting current container: %s", e)
306                 return
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                              body={
310                                                  'output': self.final_output_collection.portable_data_hash(),
311                                              }).execute(num_retries=self.num_retries)
312                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
313                                               body={
314                                                   'is_trashed': True
315                                               }).execute(num_retries=self.num_retries)
316             except Exception as e:
317                 logger.info("Setting container output: %s", e)
318         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
319             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
320                                    body={
321                                        'output': self.final_output_collection.portable_data_hash(),
322                                        'success': self.final_status == "success",
323                                        'progress':1.0
324                                    }).execute(num_retries=self.num_retries)
325
326     def arv_executor(self, tool, job_order, **kwargs):
327         self.debug = kwargs.get("debug")
328
329         tool.visit(self.check_features)
330
331         self.project_uuid = kwargs.get("project_uuid")
332         self.pipeline = None
333         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
334                                                                  api_client=self.api,
335                                                                  keep_client=self.keep_client)
336         self.fs_access = make_fs_access(kwargs["basedir"])
337
338         if not kwargs.get("name"):
339             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
340
341         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
342         # Also uploads docker images.
343         upload_workflow_deps(self, tool)
344
345         # Reload tool object which may have been updated by
346         # upload_workflow_deps
347         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
348                                   makeTool=self.arv_make_tool,
349                                   loader=tool.doc_loader,
350                                   avsc_names=tool.doc_schema,
351                                   metadata=tool.metadata)
352
353         # Upload local file references in the job order.
354         job_order = upload_job_order(self, "%s input" % kwargs["name"],
355                                      tool, job_order)
356
357         existing_uuid = kwargs.get("update_workflow")
358         if existing_uuid or kwargs.get("create_workflow"):
359             # Create a pipeline template or workflow record and exit.
360             if self.work_api == "jobs":
361                 tmpl = RunnerTemplate(self, tool, job_order,
362                                       kwargs.get("enable_reuse"),
363                                       uuid=existing_uuid,
364                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
365                                       name=kwargs["name"])
366                 tmpl.save()
367                 # cwltool.main will write our return value to stdout.
368                 return (tmpl.uuid, "success")
369             elif self.work_api == "containers":
370                 return (upload_workflow(self, tool, job_order,
371                                         self.project_uuid,
372                                         uuid=existing_uuid,
373                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
374                                         name=kwargs["name"]),
375                         "success")
376
377         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
378
379         kwargs["make_fs_access"] = make_fs_access
380         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
381         kwargs["use_container"] = True
382         kwargs["tmpdir_prefix"] = "tmp"
383         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
384
385         if self.work_api == "containers":
386             kwargs["outdir"] = "/var/spool/cwl"
387             kwargs["docker_outdir"] = "/var/spool/cwl"
388             kwargs["tmpdir"] = "/tmp"
389             kwargs["docker_tmpdir"] = "/tmp"
390         elif self.work_api == "jobs":
391             kwargs["outdir"] = "$(task.outdir)"
392             kwargs["docker_outdir"] = "$(task.outdir)"
393             kwargs["tmpdir"] = "$(task.tmpdir)"
394
395         runnerjob = None
396         if kwargs.get("submit"):
397             # Submit a runner job to run the workflow for us.
398             if self.work_api == "containers":
399                 if tool.tool["class"] == "CommandLineTool":
400                     kwargs["runnerjob"] = tool.tool["id"]
401                     upload_dependencies(self,
402                                         kwargs["name"],
403                                         tool.doc_loader,
404                                         tool.tool,
405                                         tool.tool["id"],
406                                         False)
407                     runnerjob = tool.job(job_order,
408                                          self.output_callback,
409                                          **kwargs).next()
410                 else:
411                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
412                                                 self.output_name,
413                                                 self.output_tags,
414                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
415                                                 name=kwargs.get("name"),
416                                                 on_error=kwargs.get("on_error"),
417                                                 submit_runner_image=kwargs.get("submit_runner_image"))
418             elif self.work_api == "jobs":
419                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
420                                       self.output_name,
421                                       self.output_tags,
422                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
423                                       name=kwargs.get("name"),
424                                       on_error=kwargs.get("on_error"),
425                                       submit_runner_image=kwargs.get("submit_runner_image"))
426
427         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and self.work_api == "jobs":
428             # Create pipeline for local run
429             self.pipeline = self.api.pipeline_instances().create(
430                 body={
431                     "owner_uuid": self.project_uuid,
432                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
433                     "components": {},
434                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
435             logger.info("Pipeline instance %s", self.pipeline["uuid"])
436
437         if runnerjob and not kwargs.get("wait"):
438             runnerjob.run(wait=kwargs.get("wait"))
439             return (runnerjob.uuid, "success")
440
441         self.poll_api = arvados.api('v1')
442         self.polling_thread = threading.Thread(target=self.poll_states)
443         self.polling_thread.start()
444
445         if runnerjob:
446             jobiter = iter((runnerjob,))
447         else:
448             if "cwl_runner_job" in kwargs:
449                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
450             jobiter = tool.job(job_order,
451                                self.output_callback,
452                                **kwargs)
453
454         try:
455             self.cond.acquire()
456             # Will continue to hold the lock for the duration of this code
457             # except when in cond.wait(), at which point on_message can update
458             # job state and process output callbacks.
459
460             loopperf = Perf(metrics, "jobiter")
461             loopperf.__enter__()
462             for runnable in jobiter:
463                 loopperf.__exit__()
464
465                 if self.stop_polling.is_set():
466                     break
467
468                 if runnable:
469                     with Perf(metrics, "run"):
470                         runnable.run(**kwargs)
471                 else:
472                     if self.processes:
473                         self.cond.wait(1)
474                     else:
475                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
476                         break
477                 loopperf.__enter__()
478             loopperf.__exit__()
479
480             while self.processes:
481                 self.cond.wait(1)
482
483         except UnsupportedRequirement:
484             raise
485         except:
486             if sys.exc_info()[0] is KeyboardInterrupt:
487                 logger.error("Interrupted, marking pipeline as failed")
488             else:
489                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
490             if self.pipeline:
491                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
492                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
493             if runnerjob and runnerjob.uuid and self.work_api == "containers":
494                 self.api.container_requests().update(uuid=runnerjob.uuid,
495                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
496         finally:
497             self.cond.release()
498             self.stop_polling.set()
499             self.polling_thread.join()
500
501         if self.final_status == "UnsupportedRequirement":
502             raise UnsupportedRequirement("Check log for details.")
503
504         if self.final_output is None:
505             raise WorkflowException("Workflow did not return a result.")
506
507         if kwargs.get("submit") and isinstance(runnerjob, Runner):
508             logger.info("Final output collection %s", runnerjob.final_output)
509         else:
510             if self.output_name is None:
511                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
512             if self.output_tags is None:
513                 self.output_tags = ""
514             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
515             self.set_crunch_output()
516
517         if kwargs.get("compute_checksum"):
518             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
519             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
520
521         return (self.final_output, self.final_status)
522
523
524 def versionstring():
525     """Print version string of key packages for provenance and debugging."""
526
527     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
528     arvpkg = pkg_resources.require("arvados-python-client")
529     cwlpkg = pkg_resources.require("cwltool")
530
531     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
532                                     "arvados-python-client", arvpkg[0].version,
533                                     "cwltool", cwlpkg[0].version)
534
535
536 def arg_parser():  # type: () -> argparse.ArgumentParser
537     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
538
539     parser.add_argument("--basedir", type=str,
540                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
541     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
542                         help="Output directory, default current directory")
543
544     parser.add_argument("--eval-timeout",
545                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
546                         type=float,
547                         default=20)
548     parser.add_argument("--version", action="store_true", help="Print version and exit")
549
550     exgroup = parser.add_mutually_exclusive_group()
551     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
552     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
553     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
554
555     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
556
557     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
558
559     exgroup = parser.add_mutually_exclusive_group()
560     exgroup.add_argument("--enable-reuse", action="store_true",
561                         default=True, dest="enable_reuse",
562                         help="")
563     exgroup.add_argument("--disable-reuse", action="store_false",
564                         default=True, dest="enable_reuse",
565                         help="")
566
567     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
568     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
569     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
570     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
571                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
572                         default=False)
573
574     exgroup = parser.add_mutually_exclusive_group()
575     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
576                         default=True, dest="submit")
577     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
578                         default=True, dest="submit")
579     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
580                          dest="create_workflow")
581     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
582     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
583
584     exgroup = parser.add_mutually_exclusive_group()
585     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
586                         default=True, dest="wait")
587     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
588                         default=True, dest="wait")
589
590     exgroup = parser.add_mutually_exclusive_group()
591     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
592                         default=True, dest="log_timestamps")
593     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
594                         default=True, dest="log_timestamps")
595
596     parser.add_argument("--api", type=str,
597                         default=None, dest="work_api",
598                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
599
600     parser.add_argument("--compute-checksum", action="store_true", default=False,
601                         help="Compute checksum of contents while collecting outputs",
602                         dest="compute_checksum")
603
604     parser.add_argument("--submit-runner-ram", type=int,
605                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
606                         default=1024)
607
608     parser.add_argument("--submit-runner-image", type=str,
609                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
610                         default=None)
611
612     parser.add_argument("--name", type=str,
613                         help="Name to use for workflow execution instance.",
614                         default=None)
615
616     parser.add_argument("--on-error", type=str,
617                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
618                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
619
620     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
621     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
622
623     return parser
624
625 def add_arv_hints():
626     cwltool.draft2tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
627     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
628     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
629     res.close()
630     cwltool.process.supportedProcessRequirements.extend([
631         "http://arvados.org/cwl#RunInSingleContainer",
632         "http://arvados.org/cwl#OutputDirType",
633         "http://arvados.org/cwl#RuntimeConstraints",
634         "http://arvados.org/cwl#PartitionRequirement",
635         "http://arvados.org/cwl#APIRequirement",
636         "http://commonwl.org/cwltool#LoadListingRequirement"
637     ])
638
639
640 def main(args, stdout, stderr, api_client=None, keep_client=None):
641     parser = arg_parser()
642
643     job_order_object = None
644     arvargs = parser.parse_args(args)
645
646     if arvargs.version:
647         print versionstring()
648         return
649
650     if arvargs.update_workflow:
651         if arvargs.update_workflow.find('-7fd4e-') == 5:
652             want_api = 'containers'
653         elif arvargs.update_workflow.find('-p5p6p-') == 5:
654             want_api = 'jobs'
655         else:
656             want_api = None
657         if want_api and arvargs.work_api and want_api != arvargs.work_api:
658             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
659                 arvargs.update_workflow, want_api, arvargs.work_api))
660             return 1
661         arvargs.work_api = want_api
662
663     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
664         job_order_object = ({}, "")
665
666     add_arv_hints()
667
668     try:
669         if api_client is None:
670             api_client=arvados.api('v1', model=OrderedJsonModel())
671         if keep_client is None:
672             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
673         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
674                               num_retries=4, output_name=arvargs.output_name,
675                               output_tags=arvargs.output_tags)
676     except Exception as e:
677         logger.error(e)
678         return 1
679
680     if arvargs.debug:
681         logger.setLevel(logging.DEBUG)
682         logging.getLogger('arvados').setLevel(logging.DEBUG)
683
684     if arvargs.quiet:
685         logger.setLevel(logging.WARN)
686         logging.getLogger('arvados').setLevel(logging.WARN)
687         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
688
689     if arvargs.metrics:
690         metrics.setLevel(logging.DEBUG)
691         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
692
693     if arvargs.log_timestamps:
694         arvados.log_handler.setFormatter(logging.Formatter(
695             '%(asctime)s %(name)s %(levelname)s: %(message)s',
696             '%Y-%m-%d %H:%M:%S'))
697     else:
698         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
699
700     arvargs.conformance_test = None
701     arvargs.use_container = True
702     arvargs.relax_path_checks = True
703     arvargs.validate = None
704
705     return cwltool.main.main(args=arvargs,
706                              stdout=stdout,
707                              stderr=stderr,
708                              executor=runner.arv_executor,
709                              makeTool=runner.arv_make_tool,
710                              versionfunc=versionstring,
711                              job_order_object=job_order_object,
712                              make_fs_access=partial(CollectionFsAccess,
713                                                     api_client=api_client,
714                                                     keep_client=keep_client),
715                              fetcher_constructor=partial(CollectionFetcher,
716                                                          api_client=api_client,
717                                                          keep_client=keep_client,
718                                                          num_retries=runner.num_retries),
719                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
720                              logger_handler=arvados.log_handler,
721                              custom_schema_callback=add_arv_hints)