10497: Add source line reporting to errors and fix tests to work with CommentedMap...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import schema_salad
21 from schema_salad.sourceline import SourceLine
22
23 import arvados
24 import arvados.config
25 from arvados.keep import KeepClient
26 from arvados.errors import ApiError
27
28 from .arvcontainer import ArvadosContainer, RunnerContainer
29 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
30 from. runner import Runner, upload_instance
31 from .arvtool import ArvadosCommandTool
32 from .arvworkflow import ArvadosWorkflow, upload_workflow
33 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
34 from .perf import Perf
35 from .pathmapper import FinalOutputPathMapper
36 from ._version import __version__
37
38 from cwltool.pack import pack
39 from cwltool.process import shortname, UnsupportedRequirement, getListing
40 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
41 from cwltool.draft2tool import compute_checksums
42 from arvados.api import OrderedJsonModel
43
44 logger = logging.getLogger('arvados.cwl-runner')
45 metrics = logging.getLogger('arvados.cwl-runner.metrics')
46 logger.setLevel(logging.INFO)
47
48
49 class ArvCwlRunner(object):
50     """Execute a CWL tool or workflow, submit work (using either jobs or
51     containers API), wait for them to complete, and report output.
52
53     """
54
55     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
56         self.api = api_client
57         self.processes = {}
58         self.lock = threading.Lock()
59         self.cond = threading.Condition(self.lock)
60         self.final_output = None
61         self.final_status = None
62         self.uploaded = {}
63         self.num_retries = num_retries
64         self.uuid = None
65         self.stop_polling = threading.Event()
66         self.poll_api = None
67         self.pipeline = None
68         self.final_output_collection = None
69         self.output_name = output_name
70         self.output_tags = output_tags
71         self.project_uuid = None
72
73         if keep_client is not None:
74             self.keep_client = keep_client
75         else:
76             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
77
78         self.work_api = None
79         expected_api = ["jobs", "containers"]
80         for api in expected_api:
81             try:
82                 methods = self.api._rootDesc.get('resources')[api]['methods']
83                 if ('httpMethod' in methods['create'] and
84                     (work_api == api or work_api is None)):
85                     self.work_api = api
86                     break
87             except KeyError:
88                 pass
89
90         if not self.work_api:
91             if work_api is None:
92                 raise Exception("No supported APIs")
93             else:
94                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
95
96     def arv_make_tool(self, toolpath_object, **kwargs):
97         kwargs["work_api"] = self.work_api
98         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
99                                                 api_client=self.api,
100                                                 keep_client=self.keep_client)
101         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
102             return ArvadosCommandTool(self, toolpath_object, **kwargs)
103         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
104             return ArvadosWorkflow(self, toolpath_object, **kwargs)
105         else:
106             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
107
108     def output_callback(self, out, processStatus):
109         if processStatus == "success":
110             logger.info("Overall process status is %s", processStatus)
111             if self.pipeline:
112                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
113                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
114         else:
115             logger.warn("Overall process status is %s", processStatus)
116             if self.pipeline:
117                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
118                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
119         self.final_status = processStatus
120         self.final_output = out
121
122     def on_message(self, event):
123         if "object_uuid" in event:
124             if event["object_uuid"] in self.processes and event["event_type"] == "update":
125                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
126                     uuid = event["object_uuid"]
127                     with self.lock:
128                         j = self.processes[uuid]
129                         logger.info("Job %s (%s) is Running", j.name, uuid)
130                         j.running = True
131                         j.update_pipeline_component(event["properties"]["new_attributes"])
132                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
133                     uuid = event["object_uuid"]
134                     try:
135                         self.cond.acquire()
136                         j = self.processes[uuid]
137                         txt = self.work_api[0].upper() + self.work_api[1:-1]
138                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
139                         with Perf(metrics, "done %s" % j.name):
140                             j.done(event["properties"]["new_attributes"])
141                         self.cond.notify()
142                     finally:
143                         self.cond.release()
144
145     def poll_states(self):
146         """Poll status of jobs or containers listed in the processes dict.
147
148         Runs in a separate thread.
149         """
150
151         try:
152             while True:
153                 self.stop_polling.wait(15)
154                 if self.stop_polling.is_set():
155                     break
156                 with self.lock:
157                     keys = self.processes.keys()
158                 if not keys:
159                     continue
160
161                 if self.work_api == "containers":
162                     table = self.poll_api.container_requests()
163                 elif self.work_api == "jobs":
164                     table = self.poll_api.jobs()
165
166                 try:
167                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
168                 except Exception as e:
169                     logger.warn("Error checking states on API server: %s", e)
170                     continue
171
172                 for p in proc_states["items"]:
173                     self.on_message({
174                         "object_uuid": p["uuid"],
175                         "event_type": "update",
176                         "properties": {
177                             "new_attributes": p
178                         }
179                     })
180         except:
181             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
182             self.cond.acquire()
183             self.processes.clear()
184             self.cond.notify()
185             self.cond.release()
186         finally:
187             self.stop_polling.set()
188
189     def get_uploaded(self):
190         return self.uploaded.copy()
191
192     def add_uploaded(self, src, pair):
193         self.uploaded[src] = pair
194
195     def check_writable(self, obj):
196         if isinstance(obj, dict):
197             if obj.get("writable"):
198                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
199             for v in obj.itervalues():
200                 self.check_writable(v)
201         if isinstance(obj, list):
202             for v in obj:
203                 self.check_writable(v)
204
205     def make_output_collection(self, name, tagsString, outputObj):
206         outputObj = copy.deepcopy(outputObj)
207
208         files = []
209         def capture(fileobj):
210             files.append(fileobj)
211
212         adjustDirObjs(outputObj, capture)
213         adjustFileObjs(outputObj, capture)
214
215         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
216
217         final = arvados.collection.Collection(api_client=self.api,
218                                               keep_client=self.keep_client,
219                                               num_retries=self.num_retries)
220
221         srccollections = {}
222         for k,v in generatemapper.items():
223             if k.startswith("_:"):
224                 if v.type == "Directory":
225                     continue
226                 if v.type == "CreateFile":
227                     with final.open(v.target, "wb") as f:
228                         f.write(v.resolved.encode("utf-8"))
229                     continue
230
231             if not k.startswith("keep:"):
232                 raise Exception("Output source is not in keep or a literal")
233             sp = k.split("/")
234             srccollection = sp[0][5:]
235             if srccollection not in srccollections:
236                 try:
237                     srccollections[srccollection] = arvados.collection.CollectionReader(
238                         srccollection,
239                         api_client=self.api,
240                         keep_client=self.keep_client,
241                         num_retries=self.num_retries)
242                 except arvados.errors.ArgumentError as e:
243                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
244                     raise
245             reader = srccollections[srccollection]
246             try:
247                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
248                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
249             except IOError as e:
250                 logger.warn("While preparing output collection: %s", e)
251
252         def rewrite(fileobj):
253             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
254             for k in ("basename", "listing", "contents"):
255                 if k in fileobj:
256                     del fileobj[k]
257
258         adjustDirObjs(outputObj, rewrite)
259         adjustFileObjs(outputObj, rewrite)
260
261         with final.open("cwl.output.json", "w") as f:
262             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
263
264         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
265
266         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
267                     final.api_response()["name"],
268                     final.manifest_locator())
269
270         final_uuid = final.manifest_locator()
271         tags = tagsString.split(',')
272         for tag in tags:
273              self.api.links().create(body={
274                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
275                 }).execute(num_retries=self.num_retries)
276
277         def finalcollection(fileobj):
278             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
279
280         adjustDirObjs(outputObj, finalcollection)
281         adjustFileObjs(outputObj, finalcollection)
282
283         return (outputObj, final)
284
285     def set_crunch_output(self):
286         if self.work_api == "containers":
287             try:
288                 current = self.api.containers().current().execute(num_retries=self.num_retries)
289             except ApiError as e:
290                 # Status code 404 just means we're not running in a container.
291                 if e.resp.status != 404:
292                     logger.info("Getting current container: %s", e)
293                 return
294             try:
295                 self.api.containers().update(uuid=current['uuid'],
296                                              body={
297                                                  'output': self.final_output_collection.portable_data_hash(),
298                                              }).execute(num_retries=self.num_retries)
299             except Exception as e:
300                 logger.info("Setting container output: %s", e)
301         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
302             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
303                                    body={
304                                        'output': self.final_output_collection.portable_data_hash(),
305                                        'success': self.final_status == "success",
306                                        'progress':1.0
307                                    }).execute(num_retries=self.num_retries)
308
309     def arv_executor(self, tool, job_order, **kwargs):
310         self.debug = kwargs.get("debug")
311
312         tool.visit(self.check_writable)
313
314         self.project_uuid = kwargs.get("project_uuid")
315         self.pipeline = None
316         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
317                                                                  api_client=self.api,
318                                                                  keep_client=self.keep_client)
319         self.fs_access = make_fs_access(kwargs["basedir"])
320
321         existing_uuid = kwargs.get("update_workflow")
322         if existing_uuid or kwargs.get("create_workflow"):
323             if self.work_api == "jobs":
324                 tmpl = RunnerTemplate(self, tool, job_order,
325                                       kwargs.get("enable_reuse"),
326                                       uuid=existing_uuid,
327                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
328                                       name=kwargs.get("name"))
329                 tmpl.save()
330                 # cwltool.main will write our return value to stdout.
331                 return tmpl.uuid
332             else:
333                 return upload_workflow(self, tool, job_order,
334                                        self.project_uuid,
335                                        uuid=existing_uuid,
336                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
337                                        name=kwargs.get("name"))
338
339         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
340
341         kwargs["make_fs_access"] = make_fs_access
342         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
343         kwargs["use_container"] = True
344         kwargs["tmpdir_prefix"] = "tmp"
345         kwargs["on_error"] = "continue"
346         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
347
348         if not kwargs["name"]:
349             del kwargs["name"]
350
351         if self.work_api == "containers":
352             kwargs["outdir"] = "/var/spool/cwl"
353             kwargs["docker_outdir"] = "/var/spool/cwl"
354             kwargs["tmpdir"] = "/tmp"
355             kwargs["docker_tmpdir"] = "/tmp"
356         elif self.work_api == "jobs":
357             kwargs["outdir"] = "$(task.outdir)"
358             kwargs["docker_outdir"] = "$(task.outdir)"
359             kwargs["tmpdir"] = "$(task.tmpdir)"
360
361         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
362
363         runnerjob = None
364         if kwargs.get("submit"):
365             if self.work_api == "containers":
366                 if tool.tool["class"] == "CommandLineTool":
367                     kwargs["runnerjob"] = tool.tool["id"]
368                     runnerjob = tool.job(job_order,
369                                          self.output_callback,
370                                          **kwargs).next()
371                 else:
372                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
373                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
374                                                 name=kwargs.get("name"))
375             else:
376                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
377                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
378                                       name=kwargs.get("name"))
379
380         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
381             # Create pipeline for local run
382             self.pipeline = self.api.pipeline_instances().create(
383                 body={
384                     "owner_uuid": self.project_uuid,
385                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
386                     "components": {},
387                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
388             logger.info("Pipeline instance %s", self.pipeline["uuid"])
389
390         if runnerjob and not kwargs.get("wait"):
391             runnerjob.run(wait=kwargs.get("wait"))
392             return runnerjob.uuid
393
394         self.poll_api = arvados.api('v1')
395         self.polling_thread = threading.Thread(target=self.poll_states)
396         self.polling_thread.start()
397
398         if runnerjob:
399             jobiter = iter((runnerjob,))
400         else:
401             if "cwl_runner_job" in kwargs:
402                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
403             jobiter = tool.job(job_order,
404                                self.output_callback,
405                                **kwargs)
406
407         try:
408             self.cond.acquire()
409             # Will continue to hold the lock for the duration of this code
410             # except when in cond.wait(), at which point on_message can update
411             # job state and process output callbacks.
412
413             loopperf = Perf(metrics, "jobiter")
414             loopperf.__enter__()
415             for runnable in jobiter:
416                 loopperf.__exit__()
417
418                 if self.stop_polling.is_set():
419                     break
420
421                 if runnable:
422                     with Perf(metrics, "run"):
423                         runnable.run(**kwargs)
424                 else:
425                     if self.processes:
426                         self.cond.wait(1)
427                     else:
428                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
429                         break
430                 loopperf.__enter__()
431             loopperf.__exit__()
432
433             while self.processes:
434                 self.cond.wait(1)
435
436         except UnsupportedRequirement:
437             raise
438         except:
439             if sys.exc_info()[0] is KeyboardInterrupt:
440                 logger.error("Interrupted, marking pipeline as failed")
441             else:
442                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
443             if self.pipeline:
444                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
445                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
446             if runnerjob and runnerjob.uuid and self.work_api == "containers":
447                 self.api.container_requests().update(uuid=runnerjob.uuid,
448                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
449         finally:
450             self.cond.release()
451             self.stop_polling.set()
452             self.polling_thread.join()
453
454         if self.final_status == "UnsupportedRequirement":
455             raise UnsupportedRequirement("Check log for details.")
456
457         if self.final_output is None:
458             raise WorkflowException("Workflow did not return a result.")
459
460         if kwargs.get("submit") and isinstance(runnerjob, Runner):
461             logger.info("Final output collection %s", runnerjob.final_output)
462         else:
463             if self.output_name is None:
464                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
465             if self.output_tags is None:
466                 self.output_tags = ""
467             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
468             self.set_crunch_output()
469
470         if self.final_status != "success":
471             raise WorkflowException("Workflow failed.")
472
473         if kwargs.get("compute_checksum"):
474             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
475             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
476
477         return self.final_output
478
479
480 def versionstring():
481     """Print version string of key packages for provenance and debugging."""
482
483     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
484     arvpkg = pkg_resources.require("arvados-python-client")
485     cwlpkg = pkg_resources.require("cwltool")
486
487     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
488                                     "arvados-python-client", arvpkg[0].version,
489                                     "cwltool", cwlpkg[0].version)
490
491
492 def arg_parser():  # type: () -> argparse.ArgumentParser
493     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
494
495     parser.add_argument("--basedir", type=str,
496                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
497     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
498                         help="Output directory, default current directory")
499
500     parser.add_argument("--eval-timeout",
501                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
502                         type=float,
503                         default=20)
504     parser.add_argument("--version", action="store_true", help="Print version and exit")
505
506     exgroup = parser.add_mutually_exclusive_group()
507     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
508     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
509     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
510
511     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
512
513     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
514
515     exgroup = parser.add_mutually_exclusive_group()
516     exgroup.add_argument("--enable-reuse", action="store_true",
517                         default=True, dest="enable_reuse",
518                         help="")
519     exgroup.add_argument("--disable-reuse", action="store_false",
520                         default=True, dest="enable_reuse",
521                         help="")
522
523     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
524     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
525     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
526     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
527                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
528                         default=False)
529
530     exgroup = parser.add_mutually_exclusive_group()
531     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
532                         default=True, dest="submit")
533     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
534                         default=True, dest="submit")
535     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
536                          dest="create_workflow")
537     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
538     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
539
540     exgroup = parser.add_mutually_exclusive_group()
541     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
542                         default=True, dest="wait")
543     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
544                         default=True, dest="wait")
545
546     parser.add_argument("--api", type=str,
547                         default=None, dest="work_api",
548                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
549
550     parser.add_argument("--compute-checksum", action="store_true", default=False,
551                         help="Compute checksum of contents while collecting outputs",
552                         dest="compute_checksum")
553
554     parser.add_argument("--submit-runner-ram", type=int,
555                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
556                         default=1024)
557
558     parser.add_argument("--name", type=str,
559                         help="Name to use for workflow execution instance.",
560                         default=None)
561
562     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
563     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
564
565     return parser
566
567 def add_arv_hints():
568     cache = {}
569     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
570     cache["http://arvados.org/cwl"] = res.read()
571     res.close()
572     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
573     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
574     for n in extnames.names:
575         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
576             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
577         document_loader.idx["http://arvados.org/cwl#"+n] = {}
578
579 def main(args, stdout, stderr, api_client=None, keep_client=None):
580     parser = arg_parser()
581
582     job_order_object = None
583     arvargs = parser.parse_args(args)
584
585     if arvargs.version:
586         print versionstring()
587         return
588
589     if arvargs.update_workflow:
590         if arvargs.update_workflow.find('-7fd4e-') == 5:
591             want_api = 'containers'
592         elif arvargs.update_workflow.find('-p5p6p-') == 5:
593             want_api = 'jobs'
594         else:
595             want_api = None
596         if want_api and arvargs.work_api and want_api != arvargs.work_api:
597             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
598                 arvargs.update_workflow, want_api, arvargs.work_api))
599             return 1
600         arvargs.work_api = want_api
601
602     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
603         job_order_object = ({}, "")
604
605     add_arv_hints()
606
607     try:
608         if api_client is None:
609             api_client=arvados.api('v1', model=OrderedJsonModel())
610         if keep_client is None:
611             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
612         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
613                               num_retries=4, output_name=arvargs.output_name,
614                               output_tags=arvargs.output_tags)
615     except Exception as e:
616         logger.error(e)
617         return 1
618
619     if arvargs.debug:
620         logger.setLevel(logging.DEBUG)
621
622     if arvargs.quiet:
623         logger.setLevel(logging.WARN)
624         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
625
626     if arvargs.metrics:
627         metrics.setLevel(logging.DEBUG)
628         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
629
630     arvargs.conformance_test = None
631     arvargs.use_container = True
632     arvargs.relax_path_checks = True
633     arvargs.validate = None
634
635     return cwltool.main.main(args=arvargs,
636                              stdout=stdout,
637                              stderr=stderr,
638                              executor=runner.arv_executor,
639                              makeTool=runner.arv_make_tool,
640                              versionfunc=versionstring,
641                              job_order_object=job_order_object,
642                              make_fs_access=partial(CollectionFsAccess,
643                                                     api_client=api_client,
644                                                     keep_client=keep_client),
645                              fetcher_constructor=partial(CollectionFetcher,
646                                                          api_client=api_client,
647                                                          keep_client=keep_client),
648                              resolver=partial(collectionResolver, api_client))