Merge branch '9831-faster-unique-name'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                         "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("class") == "InitialWorkDirRequirement":
204                 if self.work_api == "containers":
205                     raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206             if obj.get("writable"):
207                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208             if obj.get("class") == "CommandLineTool":
209                 if self.work_api == "containers":
210                     if obj.get("stdin"):
211                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212                     if obj.get("stderr"):
213                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         srccollections = {}
238         for k,v in generatemapper.items():
239             if k.startswith("_:"):
240                 if v.type == "Directory":
241                     continue
242                 if v.type == "CreateFile":
243                     with final.open(v.target, "wb") as f:
244                         f.write(v.resolved.encode("utf-8"))
245                     continue
246
247             if not k.startswith("keep:"):
248                 raise Exception("Output source is not in keep or a literal")
249             sp = k.split("/")
250             srccollection = sp[0][5:]
251             if srccollection not in srccollections:
252                 try:
253                     srccollections[srccollection] = arvados.collection.CollectionReader(
254                         srccollection,
255                         api_client=self.api,
256                         keep_client=self.keep_client,
257                         num_retries=self.num_retries)
258                 except arvados.errors.ArgumentError as e:
259                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
260                     raise
261             reader = srccollections[srccollection]
262             try:
263                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
265             except IOError as e:
266                 logger.warn("While preparing output collection: %s", e)
267
268         def rewrite(fileobj):
269             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270             for k in ("basename", "listing", "contents"):
271                 if k in fileobj:
272                     del fileobj[k]
273
274         adjustDirObjs(outputObj, rewrite)
275         adjustFileObjs(outputObj, rewrite)
276
277         with final.open("cwl.output.json", "w") as f:
278             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
279
280         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
281
282         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283                     final.api_response()["name"],
284                     final.manifest_locator())
285
286         final_uuid = final.manifest_locator()
287         tags = tagsString.split(',')
288         for tag in tags:
289              self.api.links().create(body={
290                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291                 }).execute(num_retries=self.num_retries)
292
293         def finalcollection(fileobj):
294             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
295
296         adjustDirObjs(outputObj, finalcollection)
297         adjustFileObjs(outputObj, finalcollection)
298
299         return (outputObj, final)
300
301     def set_crunch_output(self):
302         if self.work_api == "containers":
303             try:
304                 current = self.api.containers().current().execute(num_retries=self.num_retries)
305             except ApiError as e:
306                 # Status code 404 just means we're not running in a container.
307                 if e.resp.status != 404:
308                     logger.info("Getting current container: %s", e)
309                 return
310             try:
311                 self.api.containers().update(uuid=current['uuid'],
312                                              body={
313                                                  'output': self.final_output_collection.portable_data_hash(),
314                                              }).execute(num_retries=self.num_retries)
315             except Exception as e:
316                 logger.info("Setting container output: %s", e)
317         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
319                                    body={
320                                        'output': self.final_output_collection.portable_data_hash(),
321                                        'success': self.final_status == "success",
322                                        'progress':1.0
323                                    }).execute(num_retries=self.num_retries)
324
325     def arv_executor(self, tool, job_order, **kwargs):
326         self.debug = kwargs.get("debug")
327
328         tool.visit(self.check_features)
329
330         self.project_uuid = kwargs.get("project_uuid")
331         self.pipeline = None
332         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
333                                                                  api_client=self.api,
334                                                                  keep_client=self.keep_client)
335         self.fs_access = make_fs_access(kwargs["basedir"])
336
337         existing_uuid = kwargs.get("update_workflow")
338         if existing_uuid or kwargs.get("create_workflow"):
339             if self.work_api == "jobs":
340                 tmpl = RunnerTemplate(self, tool, job_order,
341                                       kwargs.get("enable_reuse"),
342                                       uuid=existing_uuid,
343                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
344                                       name=kwargs.get("name"))
345                 tmpl.save()
346                 # cwltool.main will write our return value to stdout.
347                 return (tmpl.uuid, "success")
348             else:
349                 return (upload_workflow(self, tool, job_order,
350                                        self.project_uuid,
351                                        uuid=existing_uuid,
352                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
353                                         name=kwargs.get("name")), "success")
354
355         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
356
357         kwargs["make_fs_access"] = make_fs_access
358         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
359         kwargs["use_container"] = True
360         kwargs["tmpdir_prefix"] = "tmp"
361         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
362
363         if not kwargs["name"]:
364             del kwargs["name"]
365
366         if self.work_api == "containers":
367             kwargs["outdir"] = "/var/spool/cwl"
368             kwargs["docker_outdir"] = "/var/spool/cwl"
369             kwargs["tmpdir"] = "/tmp"
370             kwargs["docker_tmpdir"] = "/tmp"
371         elif self.work_api == "jobs":
372             kwargs["outdir"] = "$(task.outdir)"
373             kwargs["docker_outdir"] = "$(task.outdir)"
374             kwargs["tmpdir"] = "$(task.tmpdir)"
375
376         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
377
378         runnerjob = None
379         if kwargs.get("submit"):
380             if self.work_api == "containers":
381                 if tool.tool["class"] == "CommandLineTool":
382                     kwargs["runnerjob"] = tool.tool["id"]
383                     runnerjob = tool.job(job_order,
384                                          self.output_callback,
385                                          **kwargs).next()
386                 else:
387                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
388                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
389                                                 name=kwargs.get("name"), on_error=kwargs.get("on_error"))
390             else:
391                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
392                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
393                                       name=kwargs.get("name"), on_error=kwargs.get("on_error"))
394
395         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
396             # Create pipeline for local run
397             self.pipeline = self.api.pipeline_instances().create(
398                 body={
399                     "owner_uuid": self.project_uuid,
400                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
401                     "components": {},
402                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
403             logger.info("Pipeline instance %s", self.pipeline["uuid"])
404
405         if runnerjob and not kwargs.get("wait"):
406             runnerjob.run(wait=kwargs.get("wait"))
407             return (runnerjob.uuid, "success")
408
409         self.poll_api = arvados.api('v1')
410         self.polling_thread = threading.Thread(target=self.poll_states)
411         self.polling_thread.start()
412
413         if runnerjob:
414             jobiter = iter((runnerjob,))
415         else:
416             if "cwl_runner_job" in kwargs:
417                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
418             jobiter = tool.job(job_order,
419                                self.output_callback,
420                                **kwargs)
421
422         try:
423             self.cond.acquire()
424             # Will continue to hold the lock for the duration of this code
425             # except when in cond.wait(), at which point on_message can update
426             # job state and process output callbacks.
427
428             loopperf = Perf(metrics, "jobiter")
429             loopperf.__enter__()
430             for runnable in jobiter:
431                 loopperf.__exit__()
432
433                 if self.stop_polling.is_set():
434                     break
435
436                 if runnable:
437                     with Perf(metrics, "run"):
438                         runnable.run(**kwargs)
439                 else:
440                     if self.processes:
441                         self.cond.wait(1)
442                     else:
443                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
444                         break
445                 loopperf.__enter__()
446             loopperf.__exit__()
447
448             while self.processes:
449                 self.cond.wait(1)
450
451         except UnsupportedRequirement:
452             raise
453         except:
454             if sys.exc_info()[0] is KeyboardInterrupt:
455                 logger.error("Interrupted, marking pipeline as failed")
456             else:
457                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
458             if self.pipeline:
459                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
460                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
461             if runnerjob and runnerjob.uuid and self.work_api == "containers":
462                 self.api.container_requests().update(uuid=runnerjob.uuid,
463                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
464         finally:
465             self.cond.release()
466             self.stop_polling.set()
467             self.polling_thread.join()
468
469         if self.final_status == "UnsupportedRequirement":
470             raise UnsupportedRequirement("Check log for details.")
471
472         if self.final_output is None:
473             raise WorkflowException("Workflow did not return a result.")
474
475         if kwargs.get("submit") and isinstance(runnerjob, Runner):
476             logger.info("Final output collection %s", runnerjob.final_output)
477         else:
478             if self.output_name is None:
479                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
480             if self.output_tags is None:
481                 self.output_tags = ""
482             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
483             self.set_crunch_output()
484
485         if kwargs.get("compute_checksum"):
486             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
487             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
488
489         return (self.final_output, self.final_status)
490
491
492 def versionstring():
493     """Print version string of key packages for provenance and debugging."""
494
495     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
496     arvpkg = pkg_resources.require("arvados-python-client")
497     cwlpkg = pkg_resources.require("cwltool")
498
499     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
500                                     "arvados-python-client", arvpkg[0].version,
501                                     "cwltool", cwlpkg[0].version)
502
503
504 def arg_parser():  # type: () -> argparse.ArgumentParser
505     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
506
507     parser.add_argument("--basedir", type=str,
508                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
509     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
510                         help="Output directory, default current directory")
511
512     parser.add_argument("--eval-timeout",
513                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
514                         type=float,
515                         default=20)
516     parser.add_argument("--version", action="store_true", help="Print version and exit")
517
518     exgroup = parser.add_mutually_exclusive_group()
519     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
520     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
521     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
522
523     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
524
525     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
526
527     exgroup = parser.add_mutually_exclusive_group()
528     exgroup.add_argument("--enable-reuse", action="store_true",
529                         default=True, dest="enable_reuse",
530                         help="")
531     exgroup.add_argument("--disable-reuse", action="store_false",
532                         default=True, dest="enable_reuse",
533                         help="")
534
535     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
536     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
537     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
538     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
539                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
540                         default=False)
541
542     exgroup = parser.add_mutually_exclusive_group()
543     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
544                         default=True, dest="submit")
545     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
546                         default=True, dest="submit")
547     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
548                          dest="create_workflow")
549     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
550     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
551
552     exgroup = parser.add_mutually_exclusive_group()
553     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
554                         default=True, dest="wait")
555     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
556                         default=True, dest="wait")
557
558     exgroup = parser.add_mutually_exclusive_group()
559     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
560                         default=True, dest="log_timestamps")
561     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
562                         default=True, dest="log_timestamps")
563
564     parser.add_argument("--api", type=str,
565                         default=None, dest="work_api",
566                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
567
568     parser.add_argument("--compute-checksum", action="store_true", default=False,
569                         help="Compute checksum of contents while collecting outputs",
570                         dest="compute_checksum")
571
572     parser.add_argument("--submit-runner-ram", type=int,
573                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
574                         default=1024)
575
576     parser.add_argument("--name", type=str,
577                         help="Name to use for workflow execution instance.",
578                         default=None)
579
580     parser.add_argument("--on-error", type=str,
581                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
582                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
583
584     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
585     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
586
587     return parser
588
589 def add_arv_hints():
590     cache = {}
591     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
592     cache["http://arvados.org/cwl"] = res.read()
593     res.close()
594     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
595     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
596     for n in extnames.names:
597         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
598             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
599         document_loader.idx["http://arvados.org/cwl#"+n] = {}
600
601 def main(args, stdout, stderr, api_client=None, keep_client=None):
602     parser = arg_parser()
603
604     job_order_object = None
605     arvargs = parser.parse_args(args)
606
607     if arvargs.version:
608         print versionstring()
609         return
610
611     if arvargs.update_workflow:
612         if arvargs.update_workflow.find('-7fd4e-') == 5:
613             want_api = 'containers'
614         elif arvargs.update_workflow.find('-p5p6p-') == 5:
615             want_api = 'jobs'
616         else:
617             want_api = None
618         if want_api and arvargs.work_api and want_api != arvargs.work_api:
619             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
620                 arvargs.update_workflow, want_api, arvargs.work_api))
621             return 1
622         arvargs.work_api = want_api
623
624     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
625         job_order_object = ({}, "")
626
627     add_arv_hints()
628
629     try:
630         if api_client is None:
631             api_client=arvados.api('v1', model=OrderedJsonModel())
632         if keep_client is None:
633             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
634         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
635                               num_retries=4, output_name=arvargs.output_name,
636                               output_tags=arvargs.output_tags)
637     except Exception as e:
638         logger.error(e)
639         return 1
640
641     if arvargs.debug:
642         logger.setLevel(logging.DEBUG)
643
644     if arvargs.quiet:
645         logger.setLevel(logging.WARN)
646         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
647
648     if arvargs.metrics:
649         metrics.setLevel(logging.DEBUG)
650         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
651
652     if arvargs.log_timestamps:
653         arvados.log_handler.setFormatter(logging.Formatter(
654             '%(asctime)s %(name)s %(levelname)s: %(message)s',
655             '%Y-%m-%d %H:%M:%S'))
656     else:
657         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
658
659     arvargs.conformance_test = None
660     arvargs.use_container = True
661     arvargs.relax_path_checks = True
662     arvargs.validate = None
663
664     return cwltool.main.main(args=arvargs,
665                              stdout=stdout,
666                              stderr=stderr,
667                              executor=runner.arv_executor,
668                              makeTool=runner.arv_make_tool,
669                              versionfunc=versionstring,
670                              job_order_object=job_order_object,
671                              make_fs_access=partial(CollectionFsAccess,
672                                                     api_client=api_client,
673                                                     keep_client=keep_client),
674                              fetcher_constructor=partial(CollectionFetcher,
675                                                          api_client=api_client,
676                                                          keep_client=keep_client),
677                              resolver=partial(collectionResolver, api_client),
678                              logger_handler=arvados.log_handler)