Merge branch '10467-client-disconnect' closes #10467
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                         "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("class") == "InitialWorkDirRequirement":
204                 if self.work_api == "containers":
205                     raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206             if obj.get("writable"):
207                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208             if obj.get("class") == "CommandLineTool":
209                 if self.work_api == "containers":
210                     if obj.get("stdin"):
211                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212                     if obj.get("stderr"):
213                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         srccollections = {}
238         for k,v in generatemapper.items():
239             if k.startswith("_:"):
240                 if v.type == "Directory":
241                     continue
242                 if v.type == "CreateFile":
243                     with final.open(v.target, "wb") as f:
244                         f.write(v.resolved.encode("utf-8"))
245                     continue
246
247             if not k.startswith("keep:"):
248                 raise Exception("Output source is not in keep or a literal")
249             sp = k.split("/")
250             srccollection = sp[0][5:]
251             if srccollection not in srccollections:
252                 try:
253                     srccollections[srccollection] = arvados.collection.CollectionReader(
254                         srccollection,
255                         api_client=self.api,
256                         keep_client=self.keep_client,
257                         num_retries=self.num_retries)
258                 except arvados.errors.ArgumentError as e:
259                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
260                     raise
261             reader = srccollections[srccollection]
262             try:
263                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
265             except IOError as e:
266                 logger.warn("While preparing output collection: %s", e)
267
268         def rewrite(fileobj):
269             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270             for k in ("basename", "listing", "contents"):
271                 if k in fileobj:
272                     del fileobj[k]
273
274         adjustDirObjs(outputObj, rewrite)
275         adjustFileObjs(outputObj, rewrite)
276
277         with final.open("cwl.output.json", "w") as f:
278             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
279
280         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
281
282         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283                     final.api_response()["name"],
284                     final.manifest_locator())
285
286         final_uuid = final.manifest_locator()
287         tags = tagsString.split(',')
288         for tag in tags:
289              self.api.links().create(body={
290                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291                 }).execute(num_retries=self.num_retries)
292
293         def finalcollection(fileobj):
294             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
295
296         adjustDirObjs(outputObj, finalcollection)
297         adjustFileObjs(outputObj, finalcollection)
298
299         return (outputObj, final)
300
301     def set_crunch_output(self):
302         if self.work_api == "containers":
303             try:
304                 current = self.api.containers().current().execute(num_retries=self.num_retries)
305             except ApiError as e:
306                 # Status code 404 just means we're not running in a container.
307                 if e.resp.status != 404:
308                     logger.info("Getting current container: %s", e)
309                 return
310             try:
311                 self.api.containers().update(uuid=current['uuid'],
312                                              body={
313                                                  'output': self.final_output_collection.portable_data_hash(),
314                                              }).execute(num_retries=self.num_retries)
315             except Exception as e:
316                 logger.info("Setting container output: %s", e)
317         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
319                                    body={
320                                        'output': self.final_output_collection.portable_data_hash(),
321                                        'success': self.final_status == "success",
322                                        'progress':1.0
323                                    }).execute(num_retries=self.num_retries)
324
325     def arv_executor(self, tool, job_order, **kwargs):
326         self.debug = kwargs.get("debug")
327
328         tool.visit(self.check_features)
329
330         self.project_uuid = kwargs.get("project_uuid")
331         self.pipeline = None
332         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
333                                                                  api_client=self.api,
334                                                                  keep_client=self.keep_client)
335         self.fs_access = make_fs_access(kwargs["basedir"])
336
337         existing_uuid = kwargs.get("update_workflow")
338         if existing_uuid or kwargs.get("create_workflow"):
339             if self.work_api == "jobs":
340                 tmpl = RunnerTemplate(self, tool, job_order,
341                                       kwargs.get("enable_reuse"),
342                                       uuid=existing_uuid,
343                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
344                                       name=kwargs.get("name"))
345                 tmpl.save()
346                 # cwltool.main will write our return value to stdout.
347                 return tmpl.uuid
348             else:
349                 return upload_workflow(self, tool, job_order,
350                                        self.project_uuid,
351                                        uuid=existing_uuid,
352                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
353                                        name=kwargs.get("name"))
354
355         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
356
357         kwargs["make_fs_access"] = make_fs_access
358         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
359         kwargs["use_container"] = True
360         kwargs["tmpdir_prefix"] = "tmp"
361         kwargs["on_error"] = "continue"
362         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
363
364         if not kwargs["name"]:
365             del kwargs["name"]
366
367         if self.work_api == "containers":
368             kwargs["outdir"] = "/var/spool/cwl"
369             kwargs["docker_outdir"] = "/var/spool/cwl"
370             kwargs["tmpdir"] = "/tmp"
371             kwargs["docker_tmpdir"] = "/tmp"
372         elif self.work_api == "jobs":
373             kwargs["outdir"] = "$(task.outdir)"
374             kwargs["docker_outdir"] = "$(task.outdir)"
375             kwargs["tmpdir"] = "$(task.tmpdir)"
376
377         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
378
379         runnerjob = None
380         if kwargs.get("submit"):
381             if self.work_api == "containers":
382                 if tool.tool["class"] == "CommandLineTool":
383                     kwargs["runnerjob"] = tool.tool["id"]
384                     runnerjob = tool.job(job_order,
385                                          self.output_callback,
386                                          **kwargs).next()
387                 else:
388                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
389                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
390                                                 name=kwargs.get("name"))
391             else:
392                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
393                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
394                                       name=kwargs.get("name"))
395
396         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
397             # Create pipeline for local run
398             self.pipeline = self.api.pipeline_instances().create(
399                 body={
400                     "owner_uuid": self.project_uuid,
401                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
402                     "components": {},
403                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
404             logger.info("Pipeline instance %s", self.pipeline["uuid"])
405
406         if runnerjob and not kwargs.get("wait"):
407             runnerjob.run(wait=kwargs.get("wait"))
408             return runnerjob.uuid
409
410         self.poll_api = arvados.api('v1')
411         self.polling_thread = threading.Thread(target=self.poll_states)
412         self.polling_thread.start()
413
414         if runnerjob:
415             jobiter = iter((runnerjob,))
416         else:
417             if "cwl_runner_job" in kwargs:
418                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
419             jobiter = tool.job(job_order,
420                                self.output_callback,
421                                **kwargs)
422
423         try:
424             self.cond.acquire()
425             # Will continue to hold the lock for the duration of this code
426             # except when in cond.wait(), at which point on_message can update
427             # job state and process output callbacks.
428
429             loopperf = Perf(metrics, "jobiter")
430             loopperf.__enter__()
431             for runnable in jobiter:
432                 loopperf.__exit__()
433
434                 if self.stop_polling.is_set():
435                     break
436
437                 if runnable:
438                     with Perf(metrics, "run"):
439                         runnable.run(**kwargs)
440                 else:
441                     if self.processes:
442                         self.cond.wait(1)
443                     else:
444                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
445                         break
446                 loopperf.__enter__()
447             loopperf.__exit__()
448
449             while self.processes:
450                 self.cond.wait(1)
451
452         except UnsupportedRequirement:
453             raise
454         except:
455             if sys.exc_info()[0] is KeyboardInterrupt:
456                 logger.error("Interrupted, marking pipeline as failed")
457             else:
458                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
459             if self.pipeline:
460                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
461                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
462             if runnerjob and runnerjob.uuid and self.work_api == "containers":
463                 self.api.container_requests().update(uuid=runnerjob.uuid,
464                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
465         finally:
466             self.cond.release()
467             self.stop_polling.set()
468             self.polling_thread.join()
469
470         if self.final_status == "UnsupportedRequirement":
471             raise UnsupportedRequirement("Check log for details.")
472
473         if self.final_output is None:
474             raise WorkflowException("Workflow did not return a result.")
475
476         if kwargs.get("submit") and isinstance(runnerjob, Runner):
477             logger.info("Final output collection %s", runnerjob.final_output)
478         else:
479             if self.output_name is None:
480                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
481             if self.output_tags is None:
482                 self.output_tags = ""
483             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
484             self.set_crunch_output()
485
486         if self.final_status != "success":
487             raise WorkflowException("Workflow failed.")
488
489         if kwargs.get("compute_checksum"):
490             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
491             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
492
493         return self.final_output
494
495
496 def versionstring():
497     """Print version string of key packages for provenance and debugging."""
498
499     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
500     arvpkg = pkg_resources.require("arvados-python-client")
501     cwlpkg = pkg_resources.require("cwltool")
502
503     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
504                                     "arvados-python-client", arvpkg[0].version,
505                                     "cwltool", cwlpkg[0].version)
506
507
508 def arg_parser():  # type: () -> argparse.ArgumentParser
509     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
510
511     parser.add_argument("--basedir", type=str,
512                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
513     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
514                         help="Output directory, default current directory")
515
516     parser.add_argument("--eval-timeout",
517                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
518                         type=float,
519                         default=20)
520     parser.add_argument("--version", action="store_true", help="Print version and exit")
521
522     exgroup = parser.add_mutually_exclusive_group()
523     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
524     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
525     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
526
527     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
528
529     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
530
531     exgroup = parser.add_mutually_exclusive_group()
532     exgroup.add_argument("--enable-reuse", action="store_true",
533                         default=True, dest="enable_reuse",
534                         help="")
535     exgroup.add_argument("--disable-reuse", action="store_false",
536                         default=True, dest="enable_reuse",
537                         help="")
538
539     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
540     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
541     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
542     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
543                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
544                         default=False)
545
546     exgroup = parser.add_mutually_exclusive_group()
547     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
548                         default=True, dest="submit")
549     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
550                         default=True, dest="submit")
551     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
552                          dest="create_workflow")
553     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
554     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
555
556     exgroup = parser.add_mutually_exclusive_group()
557     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
558                         default=True, dest="wait")
559     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
560                         default=True, dest="wait")
561
562     exgroup = parser.add_mutually_exclusive_group()
563     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
564                         default=True, dest="log_timestamps")
565     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
566                         default=True, dest="log_timestamps")
567
568     parser.add_argument("--api", type=str,
569                         default=None, dest="work_api",
570                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
571
572     parser.add_argument("--compute-checksum", action="store_true", default=False,
573                         help="Compute checksum of contents while collecting outputs",
574                         dest="compute_checksum")
575
576     parser.add_argument("--submit-runner-ram", type=int,
577                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
578                         default=1024)
579
580     parser.add_argument("--name", type=str,
581                         help="Name to use for workflow execution instance.",
582                         default=None)
583
584     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
585     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
586
587     return parser
588
589 def add_arv_hints():
590     cache = {}
591     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
592     cache["http://arvados.org/cwl"] = res.read()
593     res.close()
594     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
595     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
596     for n in extnames.names:
597         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
598             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
599         document_loader.idx["http://arvados.org/cwl#"+n] = {}
600
601 def main(args, stdout, stderr, api_client=None, keep_client=None):
602     parser = arg_parser()
603
604     job_order_object = None
605     arvargs = parser.parse_args(args)
606
607     if arvargs.version:
608         print versionstring()
609         return
610
611     if arvargs.update_workflow:
612         if arvargs.update_workflow.find('-7fd4e-') == 5:
613             want_api = 'containers'
614         elif arvargs.update_workflow.find('-p5p6p-') == 5:
615             want_api = 'jobs'
616         else:
617             want_api = None
618         if want_api and arvargs.work_api and want_api != arvargs.work_api:
619             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
620                 arvargs.update_workflow, want_api, arvargs.work_api))
621             return 1
622         arvargs.work_api = want_api
623
624     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
625         job_order_object = ({}, "")
626
627     add_arv_hints()
628
629     try:
630         if api_client is None:
631             api_client=arvados.api('v1', model=OrderedJsonModel())
632         if keep_client is None:
633             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
634         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
635                               num_retries=4, output_name=arvargs.output_name,
636                               output_tags=arvargs.output_tags)
637     except Exception as e:
638         logger.error(e)
639         return 1
640
641     if arvargs.debug:
642         logger.setLevel(logging.DEBUG)
643
644     if arvargs.quiet:
645         logger.setLevel(logging.WARN)
646         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
647
648     if arvargs.metrics:
649         metrics.setLevel(logging.DEBUG)
650         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
651
652     if arvargs.log_timestamps:
653         arvados.log_handler.setFormatter(logging.Formatter(
654             '%(asctime)s %(name)s %(levelname)s: %(message)s',
655             '%Y-%m-%d %H:%M:%S'))
656     else:
657         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
658
659     arvargs.conformance_test = None
660     arvargs.use_container = True
661     arvargs.relax_path_checks = True
662     arvargs.validate = None
663
664     return cwltool.main.main(args=arvargs,
665                              stdout=stdout,
666                              stderr=stderr,
667                              executor=runner.arv_executor,
668                              makeTool=runner.arv_make_tool,
669                              versionfunc=versionstring,
670                              job_order_object=job_order_object,
671                              make_fs_access=partial(CollectionFsAccess,
672                                                     api_client=api_client,
673                                                     keep_client=keep_client),
674                              fetcher_constructor=partial(CollectionFetcher,
675                                                          api_client=api_client,
676                                                          keep_client=keep_client),
677                              resolver=partial(collectionResolver, api_client),
678                              logger_handler=arvados.log_handler)