10497: Improve up front feature checking and error reporting.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49
50 class ArvCwlRunner(object):
51     """Execute a CWL tool or workflow, submit work (using either jobs or
52     containers API), wait for them to complete, and report output.
53
54     """
55
56     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
57         self.api = api_client
58         self.processes = {}
59         self.lock = threading.Lock()
60         self.cond = threading.Condition(self.lock)
61         self.final_output = None
62         self.final_status = None
63         self.uploaded = {}
64         self.num_retries = num_retries
65         self.uuid = None
66         self.stop_polling = threading.Event()
67         self.poll_api = None
68         self.pipeline = None
69         self.final_output_collection = None
70         self.output_name = output_name
71         self.output_tags = output_tags
72         self.project_uuid = None
73
74         if keep_client is not None:
75             self.keep_client = keep_client
76         else:
77             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
78
79         self.work_api = None
80         expected_api = ["jobs", "containers"]
81         for api in expected_api:
82             try:
83                 methods = self.api._rootDesc.get('resources')[api]['methods']
84                 if ('httpMethod' in methods['create'] and
85                     (work_api == api or work_api is None)):
86                     self.work_api = api
87                     break
88             except KeyError:
89                 pass
90
91         if not self.work_api:
92             if work_api is None:
93                 raise Exception("No supported APIs")
94             else:
95                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
96
97     def arv_make_tool(self, toolpath_object, **kwargs):
98         kwargs["work_api"] = self.work_api
99         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
100                                                 api_client=self.api,
101                                                 keep_client=self.keep_client)
102         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
103             return ArvadosCommandTool(self, toolpath_object, **kwargs)
104         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
105             return ArvadosWorkflow(self, toolpath_object, **kwargs)
106         else:
107             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
108
109     def output_callback(self, out, processStatus):
110         if processStatus == "success":
111             logger.info("Overall process status is %s", processStatus)
112             if self.pipeline:
113                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
114                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
115         else:
116             logger.warn("Overall process status is %s", processStatus)
117             if self.pipeline:
118                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
119                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
120         self.final_status = processStatus
121         self.final_output = out
122
123     def on_message(self, event):
124         if "object_uuid" in event:
125             if event["object_uuid"] in self.processes and event["event_type"] == "update":
126                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
127                     uuid = event["object_uuid"]
128                     with self.lock:
129                         j = self.processes[uuid]
130                         logger.info("Job %s (%s) is Running", j.name, uuid)
131                         j.running = True
132                         j.update_pipeline_component(event["properties"]["new_attributes"])
133                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
134                     uuid = event["object_uuid"]
135                     try:
136                         self.cond.acquire()
137                         j = self.processes[uuid]
138                         txt = self.work_api[0].upper() + self.work_api[1:-1]
139                         logger.info("%s %s (%s) is %s", txt, j.name, uuid, event["properties"]["new_attributes"]["state"])
140                         with Perf(metrics, "done %s" % j.name):
141                             j.done(event["properties"]["new_attributes"])
142                         self.cond.notify()
143                     finally:
144                         self.cond.release()
145
146     def poll_states(self):
147         """Poll status of jobs or containers listed in the processes dict.
148
149         Runs in a separate thread.
150         """
151
152         try:
153             while True:
154                 self.stop_polling.wait(15)
155                 if self.stop_polling.is_set():
156                     break
157                 with self.lock:
158                     keys = self.processes.keys()
159                 if not keys:
160                     continue
161
162                 if self.work_api == "containers":
163                     table = self.poll_api.container_requests()
164                 elif self.work_api == "jobs":
165                     table = self.poll_api.jobs()
166
167                 try:
168                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
169                 except Exception as e:
170                     logger.warn("Error checking states on API server: %s", e)
171                     continue
172
173                 for p in proc_states["items"]:
174                     self.on_message({
175                         "object_uuid": p["uuid"],
176                         "event_type": "update",
177                         "properties": {
178                             "new_attributes": p
179                         }
180                     })
181         except:
182             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
183             self.cond.acquire()
184             self.processes.clear()
185             self.cond.notify()
186             self.cond.release()
187         finally:
188             self.stop_polling.set()
189
190     def get_uploaded(self):
191         return self.uploaded.copy()
192
193     def add_uploaded(self, src, pair):
194         self.uploaded[src] = pair
195
196     def check_features(self, obj):
197         if isinstance(obj, dict):
198             if obj.get("class") == "InitialWorkDirRequirement":
199                 if self.work_api == "containers":
200                     raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
201             if obj.get("writable"):
202                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
203             if obj.get("class") == "CommandLineTool":
204                 if self.work_api == "containers":
205                     if obj.get("stdin"):
206                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
207                     if obj.get("stderr"):
208                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
209             for v in obj.itervalues():
210                 self.check_features(v)
211         if isinstance(obj, list):
212             for i,v in enumerate(obj):
213                 with SourceLine(obj, i, UnsupportedRequirement):
214                     self.check_features(v)
215
216     def make_output_collection(self, name, tagsString, outputObj):
217         outputObj = copy.deepcopy(outputObj)
218
219         files = []
220         def capture(fileobj):
221             files.append(fileobj)
222
223         adjustDirObjs(outputObj, capture)
224         adjustFileObjs(outputObj, capture)
225
226         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
227
228         final = arvados.collection.Collection(api_client=self.api,
229                                               keep_client=self.keep_client,
230                                               num_retries=self.num_retries)
231
232         srccollections = {}
233         for k,v in generatemapper.items():
234             if k.startswith("_:"):
235                 if v.type == "Directory":
236                     continue
237                 if v.type == "CreateFile":
238                     with final.open(v.target, "wb") as f:
239                         f.write(v.resolved.encode("utf-8"))
240                     continue
241
242             if not k.startswith("keep:"):
243                 raise Exception("Output source is not in keep or a literal")
244             sp = k.split("/")
245             srccollection = sp[0][5:]
246             if srccollection not in srccollections:
247                 try:
248                     srccollections[srccollection] = arvados.collection.CollectionReader(
249                         srccollection,
250                         api_client=self.api,
251                         keep_client=self.keep_client,
252                         num_retries=self.num_retries)
253                 except arvados.errors.ArgumentError as e:
254                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
255                     raise
256             reader = srccollections[srccollection]
257             try:
258                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
259                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
260             except IOError as e:
261                 logger.warn("While preparing output collection: %s", e)
262
263         def rewrite(fileobj):
264             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
265             for k in ("basename", "listing", "contents"):
266                 if k in fileobj:
267                     del fileobj[k]
268
269         adjustDirObjs(outputObj, rewrite)
270         adjustFileObjs(outputObj, rewrite)
271
272         with final.open("cwl.output.json", "w") as f:
273             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
274
275         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
276
277         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
278                     final.api_response()["name"],
279                     final.manifest_locator())
280
281         final_uuid = final.manifest_locator()
282         tags = tagsString.split(',')
283         for tag in tags:
284              self.api.links().create(body={
285                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
286                 }).execute(num_retries=self.num_retries)
287
288         def finalcollection(fileobj):
289             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
290
291         adjustDirObjs(outputObj, finalcollection)
292         adjustFileObjs(outputObj, finalcollection)
293
294         return (outputObj, final)
295
296     def set_crunch_output(self):
297         if self.work_api == "containers":
298             try:
299                 current = self.api.containers().current().execute(num_retries=self.num_retries)
300             except ApiError as e:
301                 # Status code 404 just means we're not running in a container.
302                 if e.resp.status != 404:
303                     logger.info("Getting current container: %s", e)
304                 return
305             try:
306                 self.api.containers().update(uuid=current['uuid'],
307                                              body={
308                                                  'output': self.final_output_collection.portable_data_hash(),
309                                              }).execute(num_retries=self.num_retries)
310             except Exception as e:
311                 logger.info("Setting container output: %s", e)
312         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
313             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
314                                    body={
315                                        'output': self.final_output_collection.portable_data_hash(),
316                                        'success': self.final_status == "success",
317                                        'progress':1.0
318                                    }).execute(num_retries=self.num_retries)
319
320     def arv_executor(self, tool, job_order, **kwargs):
321         self.debug = kwargs.get("debug")
322
323         tool.visit(self.check_features)
324
325         self.project_uuid = kwargs.get("project_uuid")
326         self.pipeline = None
327         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
328                                                                  api_client=self.api,
329                                                                  keep_client=self.keep_client)
330         self.fs_access = make_fs_access(kwargs["basedir"])
331
332         existing_uuid = kwargs.get("update_workflow")
333         if existing_uuid or kwargs.get("create_workflow"):
334             if self.work_api == "jobs":
335                 tmpl = RunnerTemplate(self, tool, job_order,
336                                       kwargs.get("enable_reuse"),
337                                       uuid=existing_uuid,
338                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
339                                       name=kwargs.get("name"))
340                 tmpl.save()
341                 # cwltool.main will write our return value to stdout.
342                 return tmpl.uuid
343             else:
344                 return upload_workflow(self, tool, job_order,
345                                        self.project_uuid,
346                                        uuid=existing_uuid,
347                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
348                                        name=kwargs.get("name"))
349
350         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
351
352         kwargs["make_fs_access"] = make_fs_access
353         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
354         kwargs["use_container"] = True
355         kwargs["tmpdir_prefix"] = "tmp"
356         kwargs["on_error"] = "continue"
357         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
358
359         if not kwargs["name"]:
360             del kwargs["name"]
361
362         if self.work_api == "containers":
363             kwargs["outdir"] = "/var/spool/cwl"
364             kwargs["docker_outdir"] = "/var/spool/cwl"
365             kwargs["tmpdir"] = "/tmp"
366             kwargs["docker_tmpdir"] = "/tmp"
367         elif self.work_api == "jobs":
368             kwargs["outdir"] = "$(task.outdir)"
369             kwargs["docker_outdir"] = "$(task.outdir)"
370             kwargs["tmpdir"] = "$(task.tmpdir)"
371
372         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
373
374         runnerjob = None
375         if kwargs.get("submit"):
376             if self.work_api == "containers":
377                 if tool.tool["class"] == "CommandLineTool":
378                     kwargs["runnerjob"] = tool.tool["id"]
379                     runnerjob = tool.job(job_order,
380                                          self.output_callback,
381                                          **kwargs).next()
382                 else:
383                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
384                                                 self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
385                                                 name=kwargs.get("name"))
386             else:
387                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
388                                       self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
389                                       name=kwargs.get("name"))
390
391         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
392             # Create pipeline for local run
393             self.pipeline = self.api.pipeline_instances().create(
394                 body={
395                     "owner_uuid": self.project_uuid,
396                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
397                     "components": {},
398                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
399             logger.info("Pipeline instance %s", self.pipeline["uuid"])
400
401         if runnerjob and not kwargs.get("wait"):
402             runnerjob.run(wait=kwargs.get("wait"))
403             return runnerjob.uuid
404
405         self.poll_api = arvados.api('v1')
406         self.polling_thread = threading.Thread(target=self.poll_states)
407         self.polling_thread.start()
408
409         if runnerjob:
410             jobiter = iter((runnerjob,))
411         else:
412             if "cwl_runner_job" in kwargs:
413                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
414             jobiter = tool.job(job_order,
415                                self.output_callback,
416                                **kwargs)
417
418         try:
419             self.cond.acquire()
420             # Will continue to hold the lock for the duration of this code
421             # except when in cond.wait(), at which point on_message can update
422             # job state and process output callbacks.
423
424             loopperf = Perf(metrics, "jobiter")
425             loopperf.__enter__()
426             for runnable in jobiter:
427                 loopperf.__exit__()
428
429                 if self.stop_polling.is_set():
430                     break
431
432                 if runnable:
433                     with Perf(metrics, "run"):
434                         runnable.run(**kwargs)
435                 else:
436                     if self.processes:
437                         self.cond.wait(1)
438                     else:
439                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
440                         break
441                 loopperf.__enter__()
442             loopperf.__exit__()
443
444             while self.processes:
445                 self.cond.wait(1)
446
447         except UnsupportedRequirement:
448             raise
449         except:
450             if sys.exc_info()[0] is KeyboardInterrupt:
451                 logger.error("Interrupted, marking pipeline as failed")
452             else:
453                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
454             if self.pipeline:
455                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
456                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
457             if runnerjob and runnerjob.uuid and self.work_api == "containers":
458                 self.api.container_requests().update(uuid=runnerjob.uuid,
459                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
460         finally:
461             self.cond.release()
462             self.stop_polling.set()
463             self.polling_thread.join()
464
465         if self.final_status == "UnsupportedRequirement":
466             raise UnsupportedRequirement("Check log for details.")
467
468         if self.final_output is None:
469             raise WorkflowException("Workflow did not return a result.")
470
471         if kwargs.get("submit") and isinstance(runnerjob, Runner):
472             logger.info("Final output collection %s", runnerjob.final_output)
473         else:
474             if self.output_name is None:
475                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
476             if self.output_tags is None:
477                 self.output_tags = ""
478             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
479             self.set_crunch_output()
480
481         if self.final_status != "success":
482             raise WorkflowException("Workflow failed.")
483
484         if kwargs.get("compute_checksum"):
485             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
486             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
487
488         return self.final_output
489
490
491 def versionstring():
492     """Print version string of key packages for provenance and debugging."""
493
494     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
495     arvpkg = pkg_resources.require("arvados-python-client")
496     cwlpkg = pkg_resources.require("cwltool")
497
498     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
499                                     "arvados-python-client", arvpkg[0].version,
500                                     "cwltool", cwlpkg[0].version)
501
502
503 def arg_parser():  # type: () -> argparse.ArgumentParser
504     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
505
506     parser.add_argument("--basedir", type=str,
507                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
508     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
509                         help="Output directory, default current directory")
510
511     parser.add_argument("--eval-timeout",
512                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
513                         type=float,
514                         default=20)
515     parser.add_argument("--version", action="store_true", help="Print version and exit")
516
517     exgroup = parser.add_mutually_exclusive_group()
518     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
519     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
520     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
521
522     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
523
524     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
525
526     exgroup = parser.add_mutually_exclusive_group()
527     exgroup.add_argument("--enable-reuse", action="store_true",
528                         default=True, dest="enable_reuse",
529                         help="")
530     exgroup.add_argument("--disable-reuse", action="store_false",
531                         default=True, dest="enable_reuse",
532                         help="")
533
534     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
535     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
536     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
537     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
538                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
539                         default=False)
540
541     exgroup = parser.add_mutually_exclusive_group()
542     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
543                         default=True, dest="submit")
544     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
545                         default=True, dest="submit")
546     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
547                          dest="create_workflow")
548     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
549     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
550
551     exgroup = parser.add_mutually_exclusive_group()
552     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
553                         default=True, dest="wait")
554     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
555                         default=True, dest="wait")
556
557     parser.add_argument("--api", type=str,
558                         default=None, dest="work_api",
559                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
560
561     parser.add_argument("--compute-checksum", action="store_true", default=False,
562                         help="Compute checksum of contents while collecting outputs",
563                         dest="compute_checksum")
564
565     parser.add_argument("--submit-runner-ram", type=int,
566                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
567                         default=1024)
568
569     parser.add_argument("--name", type=str,
570                         help="Name to use for workflow execution instance.",
571                         default=None)
572
573     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
574     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
575
576     return parser
577
578 def add_arv_hints():
579     cache = {}
580     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
581     cache["http://arvados.org/cwl"] = res.read()
582     res.close()
583     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
584     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
585     for n in extnames.names:
586         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
587             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
588         document_loader.idx["http://arvados.org/cwl#"+n] = {}
589
590 def main(args, stdout, stderr, api_client=None, keep_client=None):
591     parser = arg_parser()
592
593     job_order_object = None
594     arvargs = parser.parse_args(args)
595
596     if arvargs.version:
597         print versionstring()
598         return
599
600     if arvargs.update_workflow:
601         if arvargs.update_workflow.find('-7fd4e-') == 5:
602             want_api = 'containers'
603         elif arvargs.update_workflow.find('-p5p6p-') == 5:
604             want_api = 'jobs'
605         else:
606             want_api = None
607         if want_api and arvargs.work_api and want_api != arvargs.work_api:
608             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
609                 arvargs.update_workflow, want_api, arvargs.work_api))
610             return 1
611         arvargs.work_api = want_api
612
613     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
614         job_order_object = ({}, "")
615
616     add_arv_hints()
617
618     try:
619         if api_client is None:
620             api_client=arvados.api('v1', model=OrderedJsonModel())
621         if keep_client is None:
622             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
623         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
624                               num_retries=4, output_name=arvargs.output_name,
625                               output_tags=arvargs.output_tags)
626     except Exception as e:
627         logger.error(e)
628         return 1
629
630     if arvargs.debug:
631         logger.setLevel(logging.DEBUG)
632
633     if arvargs.quiet:
634         logger.setLevel(logging.WARN)
635         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
636
637     if arvargs.metrics:
638         metrics.setLevel(logging.DEBUG)
639         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
640
641     arvargs.conformance_test = None
642     arvargs.use_container = True
643     arvargs.relax_path_checks = True
644     arvargs.validate = None
645
646     return cwltool.main.main(args=arvargs,
647                              stdout=stdout,
648                              stderr=stderr,
649                              executor=runner.arv_executor,
650                              makeTool=runner.arv_make_tool,
651                              versionfunc=versionstring,
652                              job_order_object=job_order_object,
653                              make_fs_access=partial(CollectionFsAccess,
654                                                     api_client=api_client,
655                                                     keep_client=keep_client),
656                              fetcher_constructor=partial(CollectionFetcher,
657                                                          api_client=api_client,
658                                                          keep_client=keep_client),
659                              resolver=partial(collectionResolver, api_client))