13627: Improve error reporting.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from ._version import __version__
50
51 from cwltool.pack import pack
52 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
53 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
54 from cwltool.command_line_tool import compute_checksums
55
56 from arvados.api import OrderedJsonModel
57
58 logger = logging.getLogger('arvados.cwl-runner')
59 metrics = logging.getLogger('arvados.cwl-runner.metrics')
60 logger.setLevel(logging.INFO)
61
62 arvados.log_handler.setFormatter(logging.Formatter(
63         '%(asctime)s %(name)s %(levelname)s: %(message)s',
64         '%Y-%m-%d %H:%M:%S'))
65
66 DEFAULT_PRIORITY = 500
67
68 class ArvCwlRunner(object):
69     """Execute a CWL tool or workflow, submit work (using either jobs or
70     containers API), wait for them to complete, and report output.
71
72     """
73
74     def __init__(self, api_client,
75                  arvargs=None,
76                  keep_client=None,
77                  num_retries=4,
78                  thread_count=4):
79
80         if arvargs is None:
81             arvargs = argparse.Namespace()
82             arvargs.work_api = None
83             arvargs.output_name = None
84             arvargs.output_tags = None
85             arvargs.thread_count = 1
86
87         self.api = api_client
88         self.processes = {}
89         self.workflow_eval_lock = threading.Condition(threading.RLock())
90         self.final_output = None
91         self.final_status = None
92         self.num_retries = num_retries
93         self.uuid = None
94         self.stop_polling = threading.Event()
95         self.poll_api = None
96         self.pipeline = None
97         self.final_output_collection = None
98         self.output_name = arvargs.output_name
99         self.output_tags = arvargs.output_tags
100         self.project_uuid = None
101         self.intermediate_output_ttl = 0
102         self.intermediate_output_collections = []
103         self.trash_intermediate = False
104         self.thread_count = arvargs.thread_count
105         self.poll_interval = 12
106         self.loadingContext = None
107
108         if keep_client is not None:
109             self.keep_client = keep_client
110         else:
111             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
112
113         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
114
115         self.fetcher_constructor = partial(CollectionFetcher,
116                                            api_client=self.api,
117                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
118                                            num_retries=self.num_retries)
119
120         self.work_api = None
121         expected_api = ["jobs", "containers"]
122         for api in expected_api:
123             try:
124                 methods = self.api._rootDesc.get('resources')[api]['methods']
125                 if ('httpMethod' in methods['create'] and
126                     (arvargs.work_api == api or arvargs.work_api is None)):
127                     self.work_api = api
128                     break
129             except KeyError:
130                 pass
131
132         if not self.work_api:
133             if arvargs.work_api is None:
134                 raise Exception("No supported APIs")
135             else:
136                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
137
138         self.loadingContext = ArvLoadingContext(vars(arvargs))
139         self.loadingContext.fetcher_constructor = self.fetcher_constructor
140         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
141         self.loadingContext.construct_tool_object = self.arv_make_tool
142
143
144     def arv_make_tool(self, toolpath_object, loadingContext):
145         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
146             return ArvadosCommandTool(self, toolpath_object, loadingContext)
147         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
148             return ArvadosWorkflow(self, toolpath_object, loadingContext)
149         else:
150             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
151
152     def output_callback(self, out, processStatus):
153         with self.workflow_eval_lock:
154             if processStatus == "success":
155                 logger.info("Overall process status is %s", processStatus)
156                 if self.pipeline:
157                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
158                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
159             else:
160                 logger.error("Overall process status is %s", processStatus)
161                 if self.pipeline:
162                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
163                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
164             self.final_status = processStatus
165             self.final_output = out
166             self.workflow_eval_lock.notifyAll()
167
168
169     def start_run(self, runnable, runtimeContext):
170         self.task_queue.add(partial(runnable.run, runtimeContext))
171
172     def process_submitted(self, container):
173         with self.workflow_eval_lock:
174             self.processes[container.uuid] = container
175
176     def process_done(self, uuid, record):
177         with self.workflow_eval_lock:
178             j = self.processes[uuid]
179             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
180             self.task_queue.add(partial(j.done, record))
181             del self.processes[uuid]
182
183     def wrapped_callback(self, cb, obj, st):
184         with self.workflow_eval_lock:
185             cb(obj, st)
186             self.workflow_eval_lock.notifyAll()
187
188     def get_wrapped_callback(self, cb):
189         return partial(self.wrapped_callback, cb)
190
191     def on_message(self, event):
192         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
193             uuid = event["object_uuid"]
194             if event["properties"]["new_attributes"]["state"] == "Running":
195                 with self.workflow_eval_lock:
196                     j = self.processes[uuid]
197                     if j.running is False:
198                         j.running = True
199                         j.update_pipeline_component(event["properties"]["new_attributes"])
200                         logger.info("%s %s is Running", self.label(j), uuid)
201             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
202                 self.process_done(uuid, event["properties"]["new_attributes"])
203
204     def label(self, obj):
205         return "[%s %s]" % (self.work_api[0:-1], obj.name)
206
207     def poll_states(self):
208         """Poll status of jobs or containers listed in the processes dict.
209
210         Runs in a separate thread.
211         """
212
213         try:
214             remain_wait = self.poll_interval
215             while True:
216                 if remain_wait > 0:
217                     self.stop_polling.wait(remain_wait)
218                 if self.stop_polling.is_set():
219                     break
220                 with self.workflow_eval_lock:
221                     keys = list(self.processes.keys())
222                 if not keys:
223                     remain_wait = self.poll_interval
224                     continue
225
226                 begin_poll = time.time()
227                 if self.work_api == "containers":
228                     table = self.poll_api.container_requests()
229                 elif self.work_api == "jobs":
230                     table = self.poll_api.jobs()
231
232                 try:
233                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
234                 except Exception as e:
235                     logger.warn("Error checking states on API server: %s", e)
236                     remain_wait = self.poll_interval
237                     continue
238
239                 for p in proc_states["items"]:
240                     self.on_message({
241                         "object_uuid": p["uuid"],
242                         "event_type": "update",
243                         "properties": {
244                             "new_attributes": p
245                         }
246                     })
247                 finish_poll = time.time()
248                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
249         except:
250             logger.exception("Fatal error in state polling thread.")
251             with self.workflow_eval_lock:
252                 self.processes.clear()
253                 self.workflow_eval_lock.notifyAll()
254         finally:
255             self.stop_polling.set()
256
257     def add_intermediate_output(self, uuid):
258         if uuid:
259             self.intermediate_output_collections.append(uuid)
260
261     def trash_intermediate_output(self):
262         logger.info("Cleaning up intermediate output collections")
263         for i in self.intermediate_output_collections:
264             try:
265                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
266             except:
267                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
268             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
269                 break
270
271     def check_features(self, obj):
272         if isinstance(obj, dict):
273             if obj.get("writable") and self.work_api != "containers":
274                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
275             if obj.get("class") == "DockerRequirement":
276                 if obj.get("dockerOutputDirectory"):
277                     if self.work_api != "containers":
278                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
279                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
280                     if not obj.get("dockerOutputDirectory").startswith('/'):
281                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
282                             "Option 'dockerOutputDirectory' must be an absolute path.")
283             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
284                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
285             for v in obj.itervalues():
286                 self.check_features(v)
287         elif isinstance(obj, list):
288             for i,v in enumerate(obj):
289                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
290                     self.check_features(v)
291
292     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
293         outputObj = copy.deepcopy(outputObj)
294
295         files = []
296         def capture(fileobj):
297             files.append(fileobj)
298
299         adjustDirObjs(outputObj, capture)
300         adjustFileObjs(outputObj, capture)
301
302         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
303
304         final = arvados.collection.Collection(api_client=self.api,
305                                               keep_client=self.keep_client,
306                                               num_retries=self.num_retries)
307
308         for k,v in generatemapper.items():
309             if k.startswith("_:"):
310                 if v.type == "Directory":
311                     continue
312                 if v.type == "CreateFile":
313                     with final.open(v.target, "wb") as f:
314                         f.write(v.resolved.encode("utf-8"))
315                     continue
316
317             if not k.startswith("keep:"):
318                 raise Exception("Output source is not in keep or a literal")
319             sp = k.split("/")
320             srccollection = sp[0][5:]
321             try:
322                 reader = self.collection_cache.get(srccollection)
323                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
324                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
325             except arvados.errors.ArgumentError as e:
326                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
327                 raise
328             except IOError as e:
329                 logger.warn("While preparing output collection: %s", e)
330
331         def rewrite(fileobj):
332             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
333             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
334                 if k in fileobj:
335                     del fileobj[k]
336
337         adjustDirObjs(outputObj, rewrite)
338         adjustFileObjs(outputObj, rewrite)
339
340         with final.open("cwl.output.json", "w") as f:
341             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
342
343         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
344
345         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
346                     final.api_response()["name"],
347                     final.manifest_locator())
348
349         final_uuid = final.manifest_locator()
350         tags = tagsString.split(',')
351         for tag in tags:
352              self.api.links().create(body={
353                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
354                 }).execute(num_retries=self.num_retries)
355
356         def finalcollection(fileobj):
357             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
358
359         adjustDirObjs(outputObj, finalcollection)
360         adjustFileObjs(outputObj, finalcollection)
361
362         return (outputObj, final)
363
364     def set_crunch_output(self):
365         if self.work_api == "containers":
366             try:
367                 current = self.api.containers().current().execute(num_retries=self.num_retries)
368             except ApiError as e:
369                 # Status code 404 just means we're not running in a container.
370                 if e.resp.status != 404:
371                     logger.info("Getting current container: %s", e)
372                 return
373             try:
374                 self.api.containers().update(uuid=current['uuid'],
375                                              body={
376                                                  'output': self.final_output_collection.portable_data_hash(),
377                                              }).execute(num_retries=self.num_retries)
378                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
379                                               body={
380                                                   'is_trashed': True
381                                               }).execute(num_retries=self.num_retries)
382             except Exception as e:
383                 logger.info("Setting container output: %s", e)
384         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
385             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
386                                    body={
387                                        'output': self.final_output_collection.portable_data_hash(),
388                                        'success': self.final_status == "success",
389                                        'progress':1.0
390                                    }).execute(num_retries=self.num_retries)
391
392     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
393         self.debug = runtimeContext.debug
394
395         tool.visit(self.check_features)
396
397         self.project_uuid = runtimeContext.project_uuid
398         self.pipeline = None
399         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
400         self.secret_store = runtimeContext.secret_store
401
402         self.trash_intermediate = runtimeContext.trash_intermediate
403         if self.trash_intermediate and self.work_api != "containers":
404             raise Exception("--trash-intermediate is only supported with --api=containers.")
405
406         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
407         if self.intermediate_output_ttl and self.work_api != "containers":
408             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
409         if self.intermediate_output_ttl < 0:
410             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
411
412         if runtimeContext.submit_request_uuid and self.work_api != "containers":
413             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
414
415         if not runtimeContext.name:
416             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
417
418         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
419         # Also uploads docker images.
420         merged_map = upload_workflow_deps(self, tool)
421
422         # Reload tool object which may have been updated by
423         # upload_workflow_deps
424         # Don't validate this time because it will just print redundant errors.
425         loadingContext = self.loadingContext.copy()
426         loadingContext.loader = tool.doc_loader
427         loadingContext.avsc_names = tool.doc_schema
428         loadingContext.metadata = tool.metadata
429         loadingContext.do_validate = False
430
431         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
432                                   loadingContext)
433
434         # Upload local file references in the job order.
435         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
436                                      tool, job_order)
437
438         existing_uuid = runtimeContext.update_workflow
439         if existing_uuid or runtimeContext.create_workflow:
440             # Create a pipeline template or workflow record and exit.
441             if self.work_api == "jobs":
442                 tmpl = RunnerTemplate(self, tool, job_order,
443                                       runtimeContext.enable_reuse,
444                                       uuid=existing_uuid,
445                                       submit_runner_ram=runtimeContext.submit_runner_ram,
446                                       name=runtimeContext.name,
447                                       merged_map=merged_map)
448                 tmpl.save()
449                 # cwltool.main will write our return value to stdout.
450                 return (tmpl.uuid, "success")
451             elif self.work_api == "containers":
452                 return (upload_workflow(self, tool, job_order,
453                                         self.project_uuid,
454                                         uuid=existing_uuid,
455                                         submit_runner_ram=runtimeContext.submit_runner_ram,
456                                         name=runtimeContext.name,
457                                         merged_map=merged_map),
458                         "success")
459
460         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
461         self.eval_timeout = runtimeContext.eval_timeout
462
463         runtimeContext = runtimeContext.copy()
464         runtimeContext.use_container = True
465         runtimeContext.tmpdir_prefix = "tmp"
466
467         if self.work_api == "containers":
468             if self.ignore_docker_for_reuse:
469                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
470             runtimeContext.outdir = "/var/spool/cwl"
471             runtimeContext.docker_outdir = "/var/spool/cwl"
472             runtimeContext.tmpdir = "/tmp"
473             runtimeContext.docker_tmpdir = "/tmp"
474         elif self.work_api == "jobs":
475             if runtimeContext.priority != DEFAULT_PRIORITY:
476                 raise Exception("--priority not implemented for jobs API.")
477             runtimeContext.outdir = "$(task.outdir)"
478             runtimeContext.docker_outdir = "$(task.outdir)"
479             runtimeContext.tmpdir = "$(task.tmpdir)"
480
481         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
482             raise Exception("--priority must be in the range 1..1000.")
483
484         runnerjob = None
485         if runtimeContext.submit:
486             # Submit a runner job to run the workflow for us.
487             if self.work_api == "containers":
488                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
489                     runtimeContext.runnerjob = tool.tool["id"]
490                     runnerjob = tool.job(job_order,
491                                          self.output_callback,
492                                          runtimeContext).next()
493                 else:
494                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
495                                                 self.output_name,
496                                                 self.output_tags,
497                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
498                                                 name=runtimeContext.name,
499                                                 on_error=runtimeContext.on_error,
500                                                 submit_runner_image=runtimeContext.submit_runner_image,
501                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
502                                                 merged_map=merged_map,
503                                                 priority=runtimeContext.priority,
504                                                 secret_store=self.secret_store)
505             elif self.work_api == "jobs":
506                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
507                                       self.output_name,
508                                       self.output_tags,
509                                       submit_runner_ram=runtimeContext.submit_runner_ram,
510                                       name=runtimeContext.name,
511                                       on_error=runtimeContext.on_error,
512                                       submit_runner_image=runtimeContext.submit_runner_image,
513                                       merged_map=merged_map)
514         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
515             # Create pipeline for local run
516             self.pipeline = self.api.pipeline_instances().create(
517                 body={
518                     "owner_uuid": self.project_uuid,
519                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
520                     "components": {},
521                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
522             logger.info("Pipeline instance %s", self.pipeline["uuid"])
523
524         if runnerjob and not runtimeContext.wait:
525             submitargs = runtimeContext.copy()
526             submitargs.submit = False
527             runnerjob.run(submitargs)
528             return (runnerjob.uuid, "success")
529
530         self.poll_api = arvados.api('v1')
531         self.polling_thread = threading.Thread(target=self.poll_states)
532         self.polling_thread.start()
533
534         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
535
536         if runnerjob:
537             jobiter = iter((runnerjob,))
538         else:
539             if runtimeContext.cwl_runner_job is not None:
540                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
541             jobiter = tool.job(job_order,
542                                self.output_callback,
543                                runtimeContext)
544
545         try:
546             self.workflow_eval_lock.acquire()
547             # Holds the lock while this code runs and releases it when
548             # it is safe to do so in self.workflow_eval_lock.wait(),
549             # at which point on_message can update job state and
550             # process output callbacks.
551
552             loopperf = Perf(metrics, "jobiter")
553             loopperf.__enter__()
554             for runnable in jobiter:
555                 loopperf.__exit__()
556
557                 if self.stop_polling.is_set():
558                     break
559
560                 if self.task_queue.error is not None:
561                     raise self.task_queue.error
562
563                 if runnable:
564                     with Perf(metrics, "run"):
565                         self.start_run(runnable, runtimeContext)
566                 else:
567                     if (self.task_queue.in_flight + len(self.processes)) > 0:
568                         self.workflow_eval_lock.wait(3)
569                     else:
570                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
571                         break
572                 loopperf.__enter__()
573             loopperf.__exit__()
574
575             while (self.task_queue.in_flight + len(self.processes)) > 0:
576                 if self.task_queue.error is not None:
577                     raise self.task_queue.error
578                 self.workflow_eval_lock.wait(3)
579
580         except UnsupportedRequirement:
581             raise
582         except:
583             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
584                 logger.error("Interrupted, workflow will be cancelled")
585             else:
586                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
587             if self.pipeline:
588                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
589                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
590             if runnerjob and runnerjob.uuid and self.work_api == "containers":
591                 self.api.container_requests().update(uuid=runnerjob.uuid,
592                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
593         finally:
594             self.workflow_eval_lock.release()
595             self.task_queue.drain()
596             self.stop_polling.set()
597             self.polling_thread.join()
598             self.task_queue.join()
599
600         if self.final_status == "UnsupportedRequirement":
601             raise UnsupportedRequirement("Check log for details.")
602
603         if self.final_output is None:
604             raise WorkflowException("Workflow did not return a result.")
605
606         if runtimeContext.submit and isinstance(runnerjob, Runner):
607             logger.info("Final output collection %s", runnerjob.final_output)
608         else:
609             if self.output_name is None:
610                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
611             if self.output_tags is None:
612                 self.output_tags = ""
613
614             storage_classes = runtimeContext.storage_classes.strip().split(",")
615             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
616             self.set_crunch_output()
617
618         if runtimeContext.compute_checksum:
619             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
620             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
621
622         if self.trash_intermediate and self.final_status == "success":
623             self.trash_intermediate_output()
624
625         return (self.final_output, self.final_status)
626
627
628 def versionstring():
629     """Print version string of key packages for provenance and debugging."""
630
631     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
632     arvpkg = pkg_resources.require("arvados-python-client")
633     cwlpkg = pkg_resources.require("cwltool")
634
635     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
636                                     "arvados-python-client", arvpkg[0].version,
637                                     "cwltool", cwlpkg[0].version)
638
639
640 def arg_parser():  # type: () -> argparse.ArgumentParser
641     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
642
643     parser.add_argument("--basedir", type=str,
644                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
645     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
646                         help="Output directory, default current directory")
647
648     parser.add_argument("--eval-timeout",
649                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
650                         type=float,
651                         default=20)
652
653     exgroup = parser.add_mutually_exclusive_group()
654     exgroup.add_argument("--print-dot", action="store_true",
655                          help="Print workflow visualization in graphviz format and exit")
656     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
657     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
658
659     exgroup = parser.add_mutually_exclusive_group()
660     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
661     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
662     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
663
664     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
665
666     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
667
668     exgroup = parser.add_mutually_exclusive_group()
669     exgroup.add_argument("--enable-reuse", action="store_true",
670                         default=True, dest="enable_reuse",
671                         help="Enable job or container reuse (default)")
672     exgroup.add_argument("--disable-reuse", action="store_false",
673                         default=True, dest="enable_reuse",
674                         help="Disable job or container reuse")
675
676     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
677     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
678     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
679     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
680                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
681                         default=False)
682
683     exgroup = parser.add_mutually_exclusive_group()
684     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
685                         default=True, dest="submit")
686     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
687                         default=True, dest="submit")
688     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
689                          dest="create_workflow")
690     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
691     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
692
693     exgroup = parser.add_mutually_exclusive_group()
694     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
695                         default=True, dest="wait")
696     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
697                         default=True, dest="wait")
698
699     exgroup = parser.add_mutually_exclusive_group()
700     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
701                         default=True, dest="log_timestamps")
702     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
703                         default=True, dest="log_timestamps")
704
705     parser.add_argument("--api", type=str,
706                         default=None, dest="work_api",
707                         choices=("jobs", "containers"),
708                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
709
710     parser.add_argument("--compute-checksum", action="store_true", default=False,
711                         help="Compute checksum of contents while collecting outputs",
712                         dest="compute_checksum")
713
714     parser.add_argument("--submit-runner-ram", type=int,
715                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
716                         default=1024)
717
718     parser.add_argument("--submit-runner-image", type=str,
719                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
720                         default=None)
721
722     parser.add_argument("--submit-request-uuid", type=str,
723                         default=None,
724                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
725
726     parser.add_argument("--name", type=str,
727                         help="Name to use for workflow execution instance.",
728                         default=None)
729
730     parser.add_argument("--on-error", type=str,
731                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
732                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
733
734     parser.add_argument("--enable-dev", action="store_true",
735                         help="Enable loading and running development versions "
736                              "of CWL spec.", default=False)
737     parser.add_argument('--storage-classes', default="default", type=str,
738                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
739
740     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
741                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
742                         default=0)
743
744     parser.add_argument("--priority", type=int,
745                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
746                         default=DEFAULT_PRIORITY)
747
748     parser.add_argument("--disable-validate", dest="do_validate",
749                         action="store_false", default=True,
750                         help=argparse.SUPPRESS)
751
752     parser.add_argument("--disable-js-validation",
753                         action="store_true", default=False,
754                         help=argparse.SUPPRESS)
755
756     parser.add_argument("--thread-count", type=int,
757                         default=4, help="Number of threads to use for job submit and output collection.")
758
759     exgroup = parser.add_mutually_exclusive_group()
760     exgroup.add_argument("--trash-intermediate", action="store_true",
761                         default=False, dest="trash_intermediate",
762                          help="Immediately trash intermediate outputs on workflow success.")
763     exgroup.add_argument("--no-trash-intermediate", action="store_false",
764                         default=False, dest="trash_intermediate",
765                         help="Do not trash intermediate outputs (default).")
766
767     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
768     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
769
770     return parser
771
772 def add_arv_hints():
773     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
774     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
775     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
776     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
777     res.close()
778     cwltool.process.supportedProcessRequirements.extend([
779         "http://arvados.org/cwl#RunInSingleContainer",
780         "http://arvados.org/cwl#OutputDirType",
781         "http://arvados.org/cwl#RuntimeConstraints",
782         "http://arvados.org/cwl#PartitionRequirement",
783         "http://arvados.org/cwl#APIRequirement",
784         "http://commonwl.org/cwltool#LoadListingRequirement",
785         "http://arvados.org/cwl#IntermediateOutput",
786         "http://arvados.org/cwl#ReuseRequirement"
787     ])
788
789 def exit_signal_handler(sigcode, frame):
790     logger.error("Caught signal {}, exiting.".format(sigcode))
791     sys.exit(-sigcode)
792
793 def main(args, stdout, stderr, api_client=None, keep_client=None,
794          install_sig_handlers=True):
795     parser = arg_parser()
796
797     job_order_object = None
798     arvargs = parser.parse_args(args)
799
800     if len(arvargs.storage_classes.strip().split(',')) > 1:
801         logger.error("Multiple storage classes are not supported currently.")
802         return 1
803
804     arvargs.use_container = True
805     arvargs.relax_path_checks = True
806     arvargs.print_supported_versions = False
807
808     if install_sig_handlers:
809         arv_cmd.install_signal_handlers()
810
811     if arvargs.update_workflow:
812         if arvargs.update_workflow.find('-7fd4e-') == 5:
813             want_api = 'containers'
814         elif arvargs.update_workflow.find('-p5p6p-') == 5:
815             want_api = 'jobs'
816         else:
817             want_api = None
818         if want_api and arvargs.work_api and want_api != arvargs.work_api:
819             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
820                 arvargs.update_workflow, want_api, arvargs.work_api))
821             return 1
822         arvargs.work_api = want_api
823
824     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
825         job_order_object = ({}, "")
826
827     add_arv_hints()
828
829     try:
830         if api_client is None:
831             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
832             keep_client = api_client.keep
833         if keep_client is None:
834             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
835         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
836     except Exception as e:
837         logger.error(e)
838         return 1
839
840     if arvargs.debug:
841         logger.setLevel(logging.DEBUG)
842         logging.getLogger('arvados').setLevel(logging.DEBUG)
843
844     if arvargs.quiet:
845         logger.setLevel(logging.WARN)
846         logging.getLogger('arvados').setLevel(logging.WARN)
847         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
848
849     if arvargs.metrics:
850         metrics.setLevel(logging.DEBUG)
851         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
852
853     if arvargs.log_timestamps:
854         arvados.log_handler.setFormatter(logging.Formatter(
855             '%(asctime)s %(name)s %(levelname)s: %(message)s',
856             '%Y-%m-%d %H:%M:%S'))
857     else:
858         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
859
860     for key, val in cwltool.argparser.get_default_args().items():
861         if not hasattr(arvargs, key):
862             setattr(arvargs, key, val)
863
864     runtimeContext = ArvRuntimeContext(vars(arvargs))
865     runtimeContext.make_fs_access = partial(CollectionFsAccess,
866                              collection_cache=runner.collection_cache)
867
868     return cwltool.main.main(args=arvargs,
869                              stdout=stdout,
870                              stderr=stderr,
871                              executor=runner.arv_executor,
872                              versionfunc=versionstring,
873                              job_order_object=job_order_object,
874                              logger_handler=arvados.log_handler,
875                              custom_schema_callback=add_arv_hints,
876                              loadingContext=runner.loadingContext,
877                              runtimeContext=runtimeContext)