10793: Bump cwltool pin for workflow engine improvements. Propagate on-error
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                         "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("class") == "InitialWorkDirRequirement":
204                 if self.work_api == "containers":
205                     raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206             if obj.get("writable"):
207                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208             if obj.get("class") == "CommandLineTool":
209                 if self.work_api == "containers":
210                     if obj.get("stdin"):
211                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212                     if obj.get("stderr"):
213                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         srccollections = {}
238         for k,v in generatemapper.items():
239             if k.startswith("_:"):
240                 if v.type == "Directory":
241                     continue
242                 if v.type == "CreateFile":
243                     with final.open(v.target, "wb") as f:
244                         f.write(v.resolved.encode("utf-8"))
245                     continue
246
247             if not k.startswith("keep:"):
248                 raise Exception("Output source is not in keep or a literal")
249             sp = k.split("/")
250             srccollection = sp[0][5:]
251             if srccollection not in srccollections:
252                 try:
253                     srccollections[srccollection] = arvados.collection.CollectionReader(
254                         srccollection,
255                         api_client=self.api,
256                         keep_client=self.keep_client,
257                         num_retries=self.num_retries)
258                 except arvados.errors.ArgumentError as e:
259                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
260                     raise
261             reader = srccollections[srccollection]
262             try:
263                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
265             except IOError as e:
266                 logger.warn("While preparing output collection: %s", e)
267
268         def rewrite(fileobj):
269             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270             for k in ("basename", "listing", "contents"):
271                 if k in fileobj:
272                     del fileobj[k]
273
274         adjustDirObjs(outputObj, rewrite)
275         adjustFileObjs(outputObj, rewrite)
276
277         with final.open("cwl.output.json", "w") as f:
278             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
279
280         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
281
282         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283                     final.api_response()["name"],
284                     final.manifest_locator())
285
286         final_uuid = final.manifest_locator()
287         tags = tagsString.split(',')
288         for tag in tags:
289              self.api.links().create(body={
290                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291                 }).execute(num_retries=self.num_retries)
292
293         def finalcollection(fileobj):
294             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
295
296         adjustDirObjs(outputObj, finalcollection)
297         adjustFileObjs(outputObj, finalcollection)
298
299         return (outputObj, final)
300
301     def set_crunch_output(self):
302         if self.work_api == "containers":
303             try:
304                 current = self.api.containers().current().execute(num_retries=self.num_retries)
305             except ApiError as e:
306                 # Status code 404 just means we're not running in a container.
307                 if e.resp.status != 404:
308                     logger.info("Getting current container: %s", e)
309                 return
310             try:
311                 self.api.containers().update(uuid=current['uuid'],
312                                              body={
313                                                  'output': self.final_output_collection.portable_data_hash(),
314                                              }).execute(num_retries=self.num_retries)
315             except Exception as e:
316                 logger.info("Setting container output: %s", e)
317         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
319                                    body={
320                                        'output': self.final_output_collection.portable_data_hash(),
321                                        'success': self.final_status == "success",
322                                        'progress':1.0
323                                    }).execute(num_retries=self.num_retries)
324
325     def arv_executor(self, tool, job_order, **kwargs):
326         self.debug = kwargs.get("debug")
327
328         tool.visit(self.check_features)
329
330         self.project_uuid = kwargs.get("project_uuid")
331         self.pipeline = None
332         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
333                                                                  api_client=self.api,
334                                                                  keep_client=self.keep_client)
335         self.fs_access = make_fs_access(kwargs["basedir"])
336
337         existing_uuid = kwargs.get("update_workflow")
338         if existing_uuid or kwargs.get("create_workflow"):
339             if self.work_api == "jobs":
340                 tmpl = RunnerTemplate(self, tool, job_order,
341                                       kwargs.get("enable_reuse"),
342                                       uuid=existing_uuid,
343                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
344                                       name=kwargs.get("name"))
345                 tmpl.save()
346                 # cwltool.main will write our return value to stdout.
347                 return tmpl.uuid
348             else:
349                 return upload_workflow(self, tool, job_order,
350                                        self.project_uuid,
351                                        uuid=existing_uuid,
352                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
353                                        name=kwargs.get("name"))
354
355         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
356
357         kwargs["make_fs_access"] = make_fs_access
358         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
359         kwargs["use_container"] = True
360         kwargs["tmpdir_prefix"] = "tmp"
361         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
362
363         if not kwargs["name"]:
364             del kwargs["name"]
365
366         if self.work_api == "containers":
367             kwargs["outdir"] = "/var/spool/cwl"
368             kwargs["docker_outdir"] = "/var/spool/cwl"
369             kwargs["tmpdir"] = "/tmp"
370             kwargs["docker_tmpdir"] = "/tmp"
371         elif self.work_api == "jobs":
372             kwargs["outdir"] = "$(task.outdir)"
373             kwargs["docker_outdir"] = "$(task.outdir)"
374             kwargs["tmpdir"] = "$(task.tmpdir)"
375
376         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
377
378         runnerjob = None
379         if kwargs.get("submit"):
380             if self.work_api == "containers":
381                 if tool.tool["class"] == "CommandLineTool":
382                     kwargs["runnerjob"] = tool.tool["id"]
383                     runnerjob = tool.job(job_order,
384                                          self.output_callback,
385                                          **kwargs).next()
386                 else:
387                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
388                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
389                                                 name=kwargs.get("name"), on_error=kwargs.get("on_error"))
390             else:
391                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
392                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
393                                       name=kwargs.get("name"), on_error=kwargs.get("on_error"))
394
395         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
396             # Create pipeline for local run
397             self.pipeline = self.api.pipeline_instances().create(
398                 body={
399                     "owner_uuid": self.project_uuid,
400                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
401                     "components": {},
402                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
403             logger.info("Pipeline instance %s", self.pipeline["uuid"])
404
405         if runnerjob and not kwargs.get("wait"):
406             runnerjob.run(wait=kwargs.get("wait"))
407             return runnerjob.uuid
408
409         self.poll_api = arvados.api('v1')
410         self.polling_thread = threading.Thread(target=self.poll_states)
411         self.polling_thread.start()
412
413         if runnerjob:
414             jobiter = iter((runnerjob,))
415         else:
416             if "cwl_runner_job" in kwargs:
417                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
418             jobiter = tool.job(job_order,
419                                self.output_callback,
420                                **kwargs)
421
422         try:
423             self.cond.acquire()
424             # Will continue to hold the lock for the duration of this code
425             # except when in cond.wait(), at which point on_message can update
426             # job state and process output callbacks.
427
428             loopperf = Perf(metrics, "jobiter")
429             loopperf.__enter__()
430             for runnable in jobiter:
431                 loopperf.__exit__()
432
433                 if self.stop_polling.is_set():
434                     break
435
436                 if runnable:
437                     with Perf(metrics, "run"):
438                         runnable.run(**kwargs)
439                 else:
440                     if self.processes:
441                         self.cond.wait(1)
442                     else:
443                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
444                         break
445                 loopperf.__enter__()
446             loopperf.__exit__()
447
448             while self.processes:
449                 self.cond.wait(1)
450
451         except UnsupportedRequirement:
452             raise
453         except:
454             if sys.exc_info()[0] is KeyboardInterrupt:
455                 logger.error("Interrupted, marking pipeline as failed")
456             else:
457                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
458             if self.pipeline:
459                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
460                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
461             if runnerjob and runnerjob.uuid and self.work_api == "containers":
462                 self.api.container_requests().update(uuid=runnerjob.uuid,
463                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
464         finally:
465             self.cond.release()
466             self.stop_polling.set()
467             self.polling_thread.join()
468
469         if self.final_status == "UnsupportedRequirement":
470             raise UnsupportedRequirement("Check log for details.")
471
472         if self.final_output is None:
473             raise WorkflowException("Workflow did not return a result.")
474
475         if kwargs.get("submit") and isinstance(runnerjob, Runner):
476             logger.info("Final output collection %s", runnerjob.final_output)
477         else:
478             if self.output_name is None:
479                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
480             if self.output_tags is None:
481                 self.output_tags = ""
482             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
483             self.set_crunch_output()
484
485         if self.final_status != "success":
486             raise WorkflowException("Workflow failed.")
487
488         if kwargs.get("compute_checksum"):
489             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
490             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
491
492         return self.final_output
493
494
495 def versionstring():
496     """Print version string of key packages for provenance and debugging."""
497
498     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
499     arvpkg = pkg_resources.require("arvados-python-client")
500     cwlpkg = pkg_resources.require("cwltool")
501
502     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
503                                     "arvados-python-client", arvpkg[0].version,
504                                     "cwltool", cwlpkg[0].version)
505
506
507 def arg_parser():  # type: () -> argparse.ArgumentParser
508     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
509
510     parser.add_argument("--basedir", type=str,
511                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
512     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
513                         help="Output directory, default current directory")
514
515     parser.add_argument("--eval-timeout",
516                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
517                         type=float,
518                         default=20)
519     parser.add_argument("--version", action="store_true", help="Print version and exit")
520
521     exgroup = parser.add_mutually_exclusive_group()
522     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
523     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
524     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
525
526     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
527
528     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
529
530     exgroup = parser.add_mutually_exclusive_group()
531     exgroup.add_argument("--enable-reuse", action="store_true",
532                         default=True, dest="enable_reuse",
533                         help="")
534     exgroup.add_argument("--disable-reuse", action="store_false",
535                         default=True, dest="enable_reuse",
536                         help="")
537
538     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
539     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
540     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
541     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
542                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
543                         default=False)
544
545     exgroup = parser.add_mutually_exclusive_group()
546     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
547                         default=True, dest="submit")
548     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
549                         default=True, dest="submit")
550     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
551                          dest="create_workflow")
552     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
553     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
554
555     exgroup = parser.add_mutually_exclusive_group()
556     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
557                         default=True, dest="wait")
558     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
559                         default=True, dest="wait")
560
561     exgroup = parser.add_mutually_exclusive_group()
562     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
563                         default=True, dest="log_timestamps")
564     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
565                         default=True, dest="log_timestamps")
566
567     parser.add_argument("--api", type=str,
568                         default=None, dest="work_api",
569                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
570
571     parser.add_argument("--compute-checksum", action="store_true", default=False,
572                         help="Compute checksum of contents while collecting outputs",
573                         dest="compute_checksum")
574
575     parser.add_argument("--submit-runner-ram", type=int,
576                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
577                         default=1024)
578
579     parser.add_argument("--name", type=str,
580                         help="Name to use for workflow execution instance.",
581                         default=None)
582
583     parser.add_argument("--on-error", type=str,
584                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
585                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
586
587     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
588     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
589
590     return parser
591
592 def add_arv_hints():
593     cache = {}
594     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
595     cache["http://arvados.org/cwl"] = res.read()
596     res.close()
597     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
598     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
599     for n in extnames.names:
600         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
601             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
602         document_loader.idx["http://arvados.org/cwl#"+n] = {}
603
604 def main(args, stdout, stderr, api_client=None, keep_client=None):
605     parser = arg_parser()
606
607     job_order_object = None
608     arvargs = parser.parse_args(args)
609
610     if arvargs.version:
611         print versionstring()
612         return
613
614     if arvargs.update_workflow:
615         if arvargs.update_workflow.find('-7fd4e-') == 5:
616             want_api = 'containers'
617         elif arvargs.update_workflow.find('-p5p6p-') == 5:
618             want_api = 'jobs'
619         else:
620             want_api = None
621         if want_api and arvargs.work_api and want_api != arvargs.work_api:
622             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
623                 arvargs.update_workflow, want_api, arvargs.work_api))
624             return 1
625         arvargs.work_api = want_api
626
627     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
628         job_order_object = ({}, "")
629
630     add_arv_hints()
631
632     try:
633         if api_client is None:
634             api_client=arvados.api('v1', model=OrderedJsonModel())
635         if keep_client is None:
636             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
637         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
638                               num_retries=4, output_name=arvargs.output_name,
639                               output_tags=arvargs.output_tags)
640     except Exception as e:
641         logger.error(e)
642         return 1
643
644     if arvargs.debug:
645         logger.setLevel(logging.DEBUG)
646
647     if arvargs.quiet:
648         logger.setLevel(logging.WARN)
649         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
650
651     if arvargs.metrics:
652         metrics.setLevel(logging.DEBUG)
653         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
654
655     if arvargs.log_timestamps:
656         arvados.log_handler.setFormatter(logging.Formatter(
657             '%(asctime)s %(name)s %(levelname)s: %(message)s',
658             '%Y-%m-%d %H:%M:%S'))
659     else:
660         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
661
662     arvargs.conformance_test = None
663     arvargs.use_container = True
664     arvargs.relax_path_checks = True
665     arvargs.validate = None
666
667     return cwltool.main.main(args=arvargs,
668                              stdout=stdout,
669                              stderr=stderr,
670                              executor=runner.arv_executor,
671                              makeTool=runner.arv_make_tool,
672                              versionfunc=versionstring,
673                              job_order_object=job_order_object,
674                              make_fs_access=partial(CollectionFsAccess,
675                                                     api_client=api_client,
676                                                     keep_client=keep_client),
677                              fetcher_constructor=partial(CollectionFetcher,
678                                                          api_client=api_client,
679                                                          keep_client=keep_client),
680                              resolver=partial(collectionResolver, api_client),
681                              logger_handler=arvados.log_handler)