Merge branch '10812-submit-runner-image' refs #10812
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running work on Arvados, using
4 # either the Crunch jobs API or Crunch containers API.
5
6 import argparse
7 import logging
8 import os
9 import sys
10 import threading
11 import hashlib
12 import copy
13 import json
14 from functools import partial
15 import pkg_resources  # part of setuptools
16
17 from cwltool.errors import WorkflowException
18 import cwltool.main
19 import cwltool.workflow
20 import cwltool.process
21 import schema_salad
22 from schema_salad.sourceline import SourceLine
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 from .arvcontainer import ArvadosContainer, RunnerContainer
30 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
31 from. runner import Runner, upload_instance
32 from .arvtool import ArvadosCommandTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver
35 from .perf import Perf
36 from .pathmapper import FinalOutputPathMapper
37 from ._version import __version__
38
39 from cwltool.pack import pack
40 from cwltool.process import shortname, UnsupportedRequirement, getListing
41 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
42 from cwltool.draft2tool import compute_checksums
43 from arvados.api import OrderedJsonModel
44
45 logger = logging.getLogger('arvados.cwl-runner')
46 metrics = logging.getLogger('arvados.cwl-runner.metrics')
47 logger.setLevel(logging.INFO)
48
49 arvados.log_handler.setFormatter(logging.Formatter(
50         '%(asctime)s %(name)s %(levelname)s: %(message)s',
51         '%Y-%m-%d %H:%M:%S'))
52
53 class ArvCwlRunner(object):
54     """Execute a CWL tool or workflow, submit work (using either jobs or
55     containers API), wait for them to complete, and report output.
56
57     """
58
59     def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
60         self.api = api_client
61         self.processes = {}
62         self.lock = threading.Lock()
63         self.cond = threading.Condition(self.lock)
64         self.final_output = None
65         self.final_status = None
66         self.uploaded = {}
67         self.num_retries = num_retries
68         self.uuid = None
69         self.stop_polling = threading.Event()
70         self.poll_api = None
71         self.pipeline = None
72         self.final_output_collection = None
73         self.output_name = output_name
74         self.output_tags = output_tags
75         self.project_uuid = None
76
77         if keep_client is not None:
78             self.keep_client = keep_client
79         else:
80             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
81
82         self.work_api = None
83         expected_api = ["jobs", "containers"]
84         for api in expected_api:
85             try:
86                 methods = self.api._rootDesc.get('resources')[api]['methods']
87                 if ('httpMethod' in methods['create'] and
88                     (work_api == api or work_api is None)):
89                     self.work_api = api
90                     break
91             except KeyError:
92                 pass
93
94         if not self.work_api:
95             if work_api is None:
96                 raise Exception("No supported APIs")
97             else:
98                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
99
100     def arv_make_tool(self, toolpath_object, **kwargs):
101         kwargs["work_api"] = self.work_api
102         kwargs["fetcher_constructor"] = partial(CollectionFetcher,
103                                                 api_client=self.api,
104                                                 keep_client=self.keep_client)
105         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
106             return ArvadosCommandTool(self, toolpath_object, **kwargs)
107         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
108             return ArvadosWorkflow(self, toolpath_object, **kwargs)
109         else:
110             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
111
112     def output_callback(self, out, processStatus):
113         if processStatus == "success":
114             logger.info("Overall process status is %s", processStatus)
115             if self.pipeline:
116                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
117                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
118         else:
119             logger.warn("Overall process status is %s", processStatus)
120             if self.pipeline:
121                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
122                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
123         self.final_status = processStatus
124         self.final_output = out
125
126     def on_message(self, event):
127         if "object_uuid" in event:
128             if event["object_uuid"] in self.processes and event["event_type"] == "update":
129                 if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
130                     uuid = event["object_uuid"]
131                     with self.lock:
132                         j = self.processes[uuid]
133                         logger.info("%s %s is Running", self.label(j), uuid)
134                         j.running = True
135                         j.update_pipeline_component(event["properties"]["new_attributes"])
136                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
137                     uuid = event["object_uuid"]
138                     try:
139                         self.cond.acquire()
140                         j = self.processes[uuid]
141                         logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
142                         with Perf(metrics, "done %s" % j.name):
143                             j.done(event["properties"]["new_attributes"])
144                         self.cond.notify()
145                     finally:
146                         self.cond.release()
147
148     def label(self, obj):
149         return "[%s %s]" % (self.work_api[0:-1], obj.name)
150
151     def poll_states(self):
152         """Poll status of jobs or containers listed in the processes dict.
153
154         Runs in a separate thread.
155         """
156
157         try:
158             while True:
159                 self.stop_polling.wait(15)
160                 if self.stop_polling.is_set():
161                     break
162                 with self.lock:
163                     keys = self.processes.keys()
164                 if not keys:
165                     continue
166
167                 if self.work_api == "containers":
168                     table = self.poll_api.container_requests()
169                 elif self.work_api == "jobs":
170                     table = self.poll_api.jobs()
171
172                 try:
173                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
174                 except Exception as e:
175                     logger.warn("Error checking states on API server: %s", e)
176                     continue
177
178                 for p in proc_states["items"]:
179                     self.on_message({
180                         "object_uuid": p["uuid"],
181                         "event_type": "update",
182                         "properties": {
183                             "new_attributes": p
184                         }
185                     })
186         except:
187             logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
188             self.cond.acquire()
189             self.processes.clear()
190             self.cond.notify()
191             self.cond.release()
192         finally:
193             self.stop_polling.set()
194
195     def get_uploaded(self):
196         return self.uploaded.copy()
197
198     def add_uploaded(self, src, pair):
199         self.uploaded[src] = pair
200
201     def check_features(self, obj):
202         if isinstance(obj, dict):
203             if obj.get("class") == "InitialWorkDirRequirement":
204                 if self.work_api == "containers":
205                     raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
206             if obj.get("writable"):
207                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
208             if obj.get("class") == "CommandLineTool":
209                 if self.work_api == "containers":
210                     if obj.get("stdin"):
211                         raise SourceLine(obj, "stdin", UnsupportedRequirement).makeError("Stdin redirection currently not suppported with --api=containers")
212                     if obj.get("stderr"):
213                         raise SourceLine(obj, "stderr", UnsupportedRequirement).makeError("Stderr redirection currently not suppported with --api=containers")
214             for v in obj.itervalues():
215                 self.check_features(v)
216         elif isinstance(obj, list):
217             for i,v in enumerate(obj):
218                 with SourceLine(obj, i, UnsupportedRequirement):
219                     self.check_features(v)
220
221     def make_output_collection(self, name, tagsString, outputObj):
222         outputObj = copy.deepcopy(outputObj)
223
224         files = []
225         def capture(fileobj):
226             files.append(fileobj)
227
228         adjustDirObjs(outputObj, capture)
229         adjustFileObjs(outputObj, capture)
230
231         generatemapper = FinalOutputPathMapper(files, "", "", separateDirs=False)
232
233         final = arvados.collection.Collection(api_client=self.api,
234                                               keep_client=self.keep_client,
235                                               num_retries=self.num_retries)
236
237         srccollections = {}
238         for k,v in generatemapper.items():
239             if k.startswith("_:"):
240                 if v.type == "Directory":
241                     continue
242                 if v.type == "CreateFile":
243                     with final.open(v.target, "wb") as f:
244                         f.write(v.resolved.encode("utf-8"))
245                     continue
246
247             if not k.startswith("keep:"):
248                 raise Exception("Output source is not in keep or a literal")
249             sp = k.split("/")
250             srccollection = sp[0][5:]
251             if srccollection not in srccollections:
252                 try:
253                     srccollections[srccollection] = arvados.collection.CollectionReader(
254                         srccollection,
255                         api_client=self.api,
256                         keep_client=self.keep_client,
257                         num_retries=self.num_retries)
258                 except arvados.errors.ArgumentError as e:
259                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
260                     raise
261             reader = srccollections[srccollection]
262             try:
263                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
264                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
265             except IOError as e:
266                 logger.warn("While preparing output collection: %s", e)
267
268         def rewrite(fileobj):
269             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
270             for k in ("basename", "listing", "contents"):
271                 if k in fileobj:
272                     del fileobj[k]
273
274         adjustDirObjs(outputObj, rewrite)
275         adjustFileObjs(outputObj, rewrite)
276
277         with final.open("cwl.output.json", "w") as f:
278             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
279
280         final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
281
282         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
283                     final.api_response()["name"],
284                     final.manifest_locator())
285
286         final_uuid = final.manifest_locator()
287         tags = tagsString.split(',')
288         for tag in tags:
289              self.api.links().create(body={
290                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
291                 }).execute(num_retries=self.num_retries)
292
293         def finalcollection(fileobj):
294             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
295
296         adjustDirObjs(outputObj, finalcollection)
297         adjustFileObjs(outputObj, finalcollection)
298
299         return (outputObj, final)
300
301     def set_crunch_output(self):
302         if self.work_api == "containers":
303             try:
304                 current = self.api.containers().current().execute(num_retries=self.num_retries)
305             except ApiError as e:
306                 # Status code 404 just means we're not running in a container.
307                 if e.resp.status != 404:
308                     logger.info("Getting current container: %s", e)
309                 return
310             try:
311                 self.api.containers().update(uuid=current['uuid'],
312                                              body={
313                                                  'output': self.final_output_collection.portable_data_hash(),
314                                              }).execute(num_retries=self.num_retries)
315             except Exception as e:
316                 logger.info("Setting container output: %s", e)
317         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
318             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
319                                    body={
320                                        'output': self.final_output_collection.portable_data_hash(),
321                                        'success': self.final_status == "success",
322                                        'progress':1.0
323                                    }).execute(num_retries=self.num_retries)
324
325     def arv_executor(self, tool, job_order, **kwargs):
326         self.debug = kwargs.get("debug")
327
328         tool.visit(self.check_features)
329
330         self.project_uuid = kwargs.get("project_uuid")
331         self.pipeline = None
332         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
333                                                                  api_client=self.api,
334                                                                  keep_client=self.keep_client)
335         self.fs_access = make_fs_access(kwargs["basedir"])
336
337         existing_uuid = kwargs.get("update_workflow")
338         if existing_uuid or kwargs.get("create_workflow"):
339             if self.work_api == "jobs":
340                 tmpl = RunnerTemplate(self, tool, job_order,
341                                       kwargs.get("enable_reuse"),
342                                       uuid=existing_uuid,
343                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
344                                       name=kwargs.get("name"))
345                 tmpl.save()
346                 # cwltool.main will write our return value to stdout.
347                 return (tmpl.uuid, "success")
348             else:
349                 return (upload_workflow(self, tool, job_order,
350                                        self.project_uuid,
351                                        uuid=existing_uuid,
352                                        submit_runner_ram=kwargs.get("submit_runner_ram"),
353                                         name=kwargs.get("name")), "success")
354
355         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
356
357         kwargs["make_fs_access"] = make_fs_access
358         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
359         kwargs["use_container"] = True
360         kwargs["tmpdir_prefix"] = "tmp"
361         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
362
363         if not kwargs["name"]:
364             del kwargs["name"]
365
366         if self.work_api == "containers":
367             kwargs["outdir"] = "/var/spool/cwl"
368             kwargs["docker_outdir"] = "/var/spool/cwl"
369             kwargs["tmpdir"] = "/tmp"
370             kwargs["docker_tmpdir"] = "/tmp"
371         elif self.work_api == "jobs":
372             kwargs["outdir"] = "$(task.outdir)"
373             kwargs["docker_outdir"] = "$(task.outdir)"
374             kwargs["tmpdir"] = "$(task.tmpdir)"
375
376         upload_instance(self, shortname(tool.tool["id"]), tool, job_order)
377
378         runnerjob = None
379         if kwargs.get("submit"):
380             if self.work_api == "containers":
381                 if tool.tool["class"] == "CommandLineTool":
382                     kwargs["runnerjob"] = tool.tool["id"]
383                     runnerjob = tool.job(job_order,
384                                          self.output_callback,
385                                          **kwargs).next()
386                 else:
387                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
388                                                 self.output_name,
389                                                 self.output_tags,
390                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
391                                                 name=kwargs.get("name"),
392                                                 on_error=kwargs.get("on_error"),
393                                                 submit_runner_image=kwargs.get("submit_runner_image"))
394             elif self.work_api == "jobs":
395                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
396                                       self.output_name,
397                                       self.output_tags,
398                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
399                                       name=kwargs.get("name"),
400                                       on_error=kwargs.get("on_error"),
401                                       submit_runner_image=kwargs.get("submit_runner_image"))
402
403         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
404             # Create pipeline for local run
405             self.pipeline = self.api.pipeline_instances().create(
406                 body={
407                     "owner_uuid": self.project_uuid,
408                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
409                     "components": {},
410                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
411             logger.info("Pipeline instance %s", self.pipeline["uuid"])
412
413         if runnerjob and not kwargs.get("wait"):
414             runnerjob.run(wait=kwargs.get("wait"))
415             return (runnerjob.uuid, "success")
416
417         self.poll_api = arvados.api('v1')
418         self.polling_thread = threading.Thread(target=self.poll_states)
419         self.polling_thread.start()
420
421         if runnerjob:
422             jobiter = iter((runnerjob,))
423         else:
424             if "cwl_runner_job" in kwargs:
425                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
426             jobiter = tool.job(job_order,
427                                self.output_callback,
428                                **kwargs)
429
430         try:
431             self.cond.acquire()
432             # Will continue to hold the lock for the duration of this code
433             # except when in cond.wait(), at which point on_message can update
434             # job state and process output callbacks.
435
436             loopperf = Perf(metrics, "jobiter")
437             loopperf.__enter__()
438             for runnable in jobiter:
439                 loopperf.__exit__()
440
441                 if self.stop_polling.is_set():
442                     break
443
444                 if runnable:
445                     with Perf(metrics, "run"):
446                         runnable.run(**kwargs)
447                 else:
448                     if self.processes:
449                         self.cond.wait(1)
450                     else:
451                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
452                         break
453                 loopperf.__enter__()
454             loopperf.__exit__()
455
456             while self.processes:
457                 self.cond.wait(1)
458
459         except UnsupportedRequirement:
460             raise
461         except:
462             if sys.exc_info()[0] is KeyboardInterrupt:
463                 logger.error("Interrupted, marking pipeline as failed")
464             else:
465                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
466             if self.pipeline:
467                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
468                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
469             if runnerjob and runnerjob.uuid and self.work_api == "containers":
470                 self.api.container_requests().update(uuid=runnerjob.uuid,
471                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
472         finally:
473             self.cond.release()
474             self.stop_polling.set()
475             self.polling_thread.join()
476
477         if self.final_status == "UnsupportedRequirement":
478             raise UnsupportedRequirement("Check log for details.")
479
480         if self.final_output is None:
481             raise WorkflowException("Workflow did not return a result.")
482
483         if kwargs.get("submit") and isinstance(runnerjob, Runner):
484             logger.info("Final output collection %s", runnerjob.final_output)
485         else:
486             if self.output_name is None:
487                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
488             if self.output_tags is None:
489                 self.output_tags = ""
490             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
491             self.set_crunch_output()
492
493         if kwargs.get("compute_checksum"):
494             adjustDirObjs(self.final_output, partial(getListing, self.fs_access))
495             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
496
497         return (self.final_output, self.final_status)
498
499
500 def versionstring():
501     """Print version string of key packages for provenance and debugging."""
502
503     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
504     arvpkg = pkg_resources.require("arvados-python-client")
505     cwlpkg = pkg_resources.require("cwltool")
506
507     return "%s %s %s, %s %s, %s %s" % (sys.argv[0], __version__, arvcwlpkg[0].version,
508                                     "arvados-python-client", arvpkg[0].version,
509                                     "cwltool", cwlpkg[0].version)
510
511
512 def arg_parser():  # type: () -> argparse.ArgumentParser
513     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
514
515     parser.add_argument("--basedir", type=str,
516                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
517     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
518                         help="Output directory, default current directory")
519
520     parser.add_argument("--eval-timeout",
521                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
522                         type=float,
523                         default=20)
524     parser.add_argument("--version", action="store_true", help="Print version and exit")
525
526     exgroup = parser.add_mutually_exclusive_group()
527     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
528     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
529     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
530
531     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
532
533     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
534
535     exgroup = parser.add_mutually_exclusive_group()
536     exgroup.add_argument("--enable-reuse", action="store_true",
537                         default=True, dest="enable_reuse",
538                         help="")
539     exgroup.add_argument("--disable-reuse", action="store_false",
540                         default=True, dest="enable_reuse",
541                         help="")
542
543     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
544     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
545     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
546     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
547                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
548                         default=False)
549
550     exgroup = parser.add_mutually_exclusive_group()
551     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
552                         default=True, dest="submit")
553     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
554                         default=True, dest="submit")
555     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
556                          dest="create_workflow")
557     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
558     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
559
560     exgroup = parser.add_mutually_exclusive_group()
561     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
562                         default=True, dest="wait")
563     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
564                         default=True, dest="wait")
565
566     exgroup = parser.add_mutually_exclusive_group()
567     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
568                         default=True, dest="log_timestamps")
569     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
570                         default=True, dest="log_timestamps")
571
572     parser.add_argument("--api", type=str,
573                         default=None, dest="work_api",
574                         help="Select work submission API, one of 'jobs' or 'containers'. Default is 'jobs' if that API is available, otherwise 'containers'.")
575
576     parser.add_argument("--compute-checksum", action="store_true", default=False,
577                         help="Compute checksum of contents while collecting outputs",
578                         dest="compute_checksum")
579
580     parser.add_argument("--submit-runner-ram", type=int,
581                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
582                         default=1024)
583
584     parser.add_argument("--submit-runner-image", type=str,
585                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
586                         default=None)
587
588     parser.add_argument("--name", type=str,
589                         help="Name to use for workflow execution instance.",
590                         default=None)
591
592     parser.add_argument("--on-error", type=str,
593                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
594                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
595
596     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
597     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
598
599     return parser
600
601 def add_arv_hints():
602     cache = {}
603     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
604     cache["http://arvados.org/cwl"] = res.read()
605     res.close()
606     document_loader, cwlnames, _, _ = cwltool.process.get_schema("v1.0")
607     _, extnames, _, _ = schema_salad.schema.load_schema("http://arvados.org/cwl", cache=cache)
608     for n in extnames.names:
609         if not cwlnames.has_name("http://arvados.org/cwl#"+n, ""):
610             cwlnames.add_name("http://arvados.org/cwl#"+n, "", extnames.get_name(n, ""))
611         document_loader.idx["http://arvados.org/cwl#"+n] = {}
612
613 def main(args, stdout, stderr, api_client=None, keep_client=None):
614     parser = arg_parser()
615
616     job_order_object = None
617     arvargs = parser.parse_args(args)
618
619     if arvargs.version:
620         print versionstring()
621         return
622
623     if arvargs.update_workflow:
624         if arvargs.update_workflow.find('-7fd4e-') == 5:
625             want_api = 'containers'
626         elif arvargs.update_workflow.find('-p5p6p-') == 5:
627             want_api = 'jobs'
628         else:
629             want_api = None
630         if want_api and arvargs.work_api and want_api != arvargs.work_api:
631             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
632                 arvargs.update_workflow, want_api, arvargs.work_api))
633             return 1
634         arvargs.work_api = want_api
635
636     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
637         job_order_object = ({}, "")
638
639     add_arv_hints()
640
641     try:
642         if api_client is None:
643             api_client=arvados.api('v1', model=OrderedJsonModel())
644         if keep_client is None:
645             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
646         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
647                               num_retries=4, output_name=arvargs.output_name,
648                               output_tags=arvargs.output_tags)
649     except Exception as e:
650         logger.error(e)
651         return 1
652
653     if arvargs.debug:
654         logger.setLevel(logging.DEBUG)
655
656     if arvargs.quiet:
657         logger.setLevel(logging.WARN)
658         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
659
660     if arvargs.metrics:
661         metrics.setLevel(logging.DEBUG)
662         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
663
664     if arvargs.log_timestamps:
665         arvados.log_handler.setFormatter(logging.Formatter(
666             '%(asctime)s %(name)s %(levelname)s: %(message)s',
667             '%Y-%m-%d %H:%M:%S'))
668     else:
669         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
670
671     arvargs.conformance_test = None
672     arvargs.use_container = True
673     arvargs.relax_path_checks = True
674     arvargs.validate = None
675
676     return cwltool.main.main(args=arvargs,
677                              stdout=stdout,
678                              stderr=stderr,
679                              executor=runner.arv_executor,
680                              makeTool=runner.arv_make_tool,
681                              versionfunc=versionstring,
682                              job_order_object=job_order_object,
683                              make_fs_access=partial(CollectionFsAccess,
684                                                     api_client=api_client,
685                                                     keep_client=keep_client),
686                              fetcher_constructor=partial(CollectionFetcher,
687                                                          api_client=api_client,
688                                                          keep_client=keep_client),
689                              resolver=partial(collectionResolver, api_client),
690                              logger_handler=arvados.log_handler)