13493: Merge branch 'master' into 13493-federation-proxy
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from ._version import __version__
50
51 from cwltool.pack import pack
52 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
53 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
54 from cwltool.command_line_tool import compute_checksums
55
56 from arvados.api import OrderedJsonModel
57
58 logger = logging.getLogger('arvados.cwl-runner')
59 metrics = logging.getLogger('arvados.cwl-runner.metrics')
60 logger.setLevel(logging.INFO)
61
62 arvados.log_handler.setFormatter(logging.Formatter(
63         '%(asctime)s %(name)s %(levelname)s: %(message)s',
64         '%Y-%m-%d %H:%M:%S'))
65
66 DEFAULT_PRIORITY = 500
67
68 class ArvCwlRunner(object):
69     """Execute a CWL tool or workflow, submit work (using either jobs or
70     containers API), wait for them to complete, and report output.
71
72     """
73
74     def __init__(self, api_client,
75                  arvargs=None,
76                  keep_client=None,
77                  num_retries=4,
78                  thread_count=4):
79
80         if arvargs is None:
81             arvargs = argparse.Namespace()
82             arvargs.work_api = None
83             arvargs.output_name = None
84             arvargs.output_tags = None
85             arvargs.thread_count = 1
86
87         self.api = api_client
88         self.processes = {}
89         self.workflow_eval_lock = threading.Condition(threading.RLock())
90         self.final_output = None
91         self.final_status = None
92         self.num_retries = num_retries
93         self.uuid = None
94         self.stop_polling = threading.Event()
95         self.poll_api = None
96         self.pipeline = None
97         self.final_output_collection = None
98         self.output_name = arvargs.output_name
99         self.output_tags = arvargs.output_tags
100         self.project_uuid = None
101         self.intermediate_output_ttl = 0
102         self.intermediate_output_collections = []
103         self.trash_intermediate = False
104         self.thread_count = arvargs.thread_count
105         self.poll_interval = 12
106         self.loadingContext = None
107
108         if keep_client is not None:
109             self.keep_client = keep_client
110         else:
111             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
112
113         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
114
115         self.fetcher_constructor = partial(CollectionFetcher,
116                                            api_client=self.api,
117                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
118                                            num_retries=self.num_retries)
119
120         self.work_api = None
121         expected_api = ["jobs", "containers"]
122         for api in expected_api:
123             try:
124                 methods = self.api._rootDesc.get('resources')[api]['methods']
125                 if ('httpMethod' in methods['create'] and
126                     (arvargs.work_api == api or arvargs.work_api is None)):
127                     self.work_api = api
128                     break
129             except KeyError:
130                 pass
131
132         if not self.work_api:
133             if arvargs.work_api is None:
134                 raise Exception("No supported APIs")
135             else:
136                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
137
138         self.loadingContext = ArvLoadingContext(vars(arvargs))
139         self.loadingContext.fetcher_constructor = self.fetcher_constructor
140         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
141         self.loadingContext.construct_tool_object = self.arv_make_tool
142
143
144     def arv_make_tool(self, toolpath_object, loadingContext):
145         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
146             return ArvadosCommandTool(self, toolpath_object, loadingContext)
147         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
148             return ArvadosWorkflow(self, toolpath_object, loadingContext)
149         else:
150             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
151
152     def output_callback(self, out, processStatus):
153         with self.workflow_eval_lock:
154             if processStatus == "success":
155                 logger.info("Overall process status is %s", processStatus)
156                 if self.pipeline:
157                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
158                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
159             else:
160                 logger.error("Overall process status is %s", processStatus)
161                 if self.pipeline:
162                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
163                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
164             self.final_status = processStatus
165             self.final_output = out
166             self.workflow_eval_lock.notifyAll()
167
168
169     def start_run(self, runnable, runtimeContext):
170         self.task_queue.add(partial(runnable.run, runtimeContext))
171
172     def process_submitted(self, container):
173         with self.workflow_eval_lock:
174             self.processes[container.uuid] = container
175
176     def process_done(self, uuid, record):
177         with self.workflow_eval_lock:
178             j = self.processes[uuid]
179             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
180             self.task_queue.add(partial(j.done, record))
181             del self.processes[uuid]
182
183     def wrapped_callback(self, cb, obj, st):
184         with self.workflow_eval_lock:
185             cb(obj, st)
186             self.workflow_eval_lock.notifyAll()
187
188     def get_wrapped_callback(self, cb):
189         return partial(self.wrapped_callback, cb)
190
191     def on_message(self, event):
192         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
193             uuid = event["object_uuid"]
194             if event["properties"]["new_attributes"]["state"] == "Running":
195                 with self.workflow_eval_lock:
196                     j = self.processes[uuid]
197                     if j.running is False:
198                         j.running = True
199                         j.update_pipeline_component(event["properties"]["new_attributes"])
200                         logger.info("%s %s is Running", self.label(j), uuid)
201             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
202                 self.process_done(uuid, event["properties"]["new_attributes"])
203
204     def label(self, obj):
205         return "[%s %s]" % (self.work_api[0:-1], obj.name)
206
207     def poll_states(self):
208         """Poll status of jobs or containers listed in the processes dict.
209
210         Runs in a separate thread.
211         """
212
213         try:
214             remain_wait = self.poll_interval
215             while True:
216                 if remain_wait > 0:
217                     self.stop_polling.wait(remain_wait)
218                 if self.stop_polling.is_set():
219                     break
220                 with self.workflow_eval_lock:
221                     keys = list(self.processes.keys())
222                 if not keys:
223                     remain_wait = self.poll_interval
224                     continue
225
226                 begin_poll = time.time()
227                 if self.work_api == "containers":
228                     table = self.poll_api.container_requests()
229                 elif self.work_api == "jobs":
230                     table = self.poll_api.jobs()
231
232                 try:
233                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
234                 except Exception as e:
235                     logger.warn("Error checking states on API server: %s", e)
236                     remain_wait = self.poll_interval
237                     continue
238
239                 for p in proc_states["items"]:
240                     self.on_message({
241                         "object_uuid": p["uuid"],
242                         "event_type": "update",
243                         "properties": {
244                             "new_attributes": p
245                         }
246                     })
247                 finish_poll = time.time()
248                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
249         except:
250             logger.exception("Fatal error in state polling thread.")
251             with self.workflow_eval_lock:
252                 self.processes.clear()
253                 self.workflow_eval_lock.notifyAll()
254         finally:
255             self.stop_polling.set()
256
257     def add_intermediate_output(self, uuid):
258         if uuid:
259             self.intermediate_output_collections.append(uuid)
260
261     def trash_intermediate_output(self):
262         logger.info("Cleaning up intermediate output collections")
263         for i in self.intermediate_output_collections:
264             try:
265                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
266             except:
267                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
268             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
269                 break
270
271     def check_features(self, obj):
272         if isinstance(obj, dict):
273             if obj.get("writable") and self.work_api != "containers":
274                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
275             if obj.get("class") == "DockerRequirement":
276                 if obj.get("dockerOutputDirectory"):
277                     if self.work_api != "containers":
278                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
279                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
280                     if not obj.get("dockerOutputDirectory").startswith('/'):
281                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
282                             "Option 'dockerOutputDirectory' must be an absolute path.")
283             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
284                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
285             for v in obj.itervalues():
286                 self.check_features(v)
287         elif isinstance(obj, list):
288             for i,v in enumerate(obj):
289                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
290                     self.check_features(v)
291
292     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
293         outputObj = copy.deepcopy(outputObj)
294
295         files = []
296         def capture(fileobj):
297             files.append(fileobj)
298
299         adjustDirObjs(outputObj, capture)
300         adjustFileObjs(outputObj, capture)
301
302         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
303
304         final = arvados.collection.Collection(api_client=self.api,
305                                               keep_client=self.keep_client,
306                                               num_retries=self.num_retries)
307
308         for k,v in generatemapper.items():
309             if k.startswith("_:"):
310                 if v.type == "Directory":
311                     continue
312                 if v.type == "CreateFile":
313                     with final.open(v.target, "wb") as f:
314                         f.write(v.resolved.encode("utf-8"))
315                     continue
316
317             if not k.startswith("keep:"):
318                 raise Exception("Output source is not in keep or a literal")
319             sp = k.split("/")
320             srccollection = sp[0][5:]
321             try:
322                 reader = self.collection_cache.get(srccollection)
323                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
324                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
325             except arvados.errors.ArgumentError as e:
326                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
327                 raise
328             except IOError as e:
329                 logger.warn("While preparing output collection: %s", e)
330
331         def rewrite(fileobj):
332             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
333             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
334                 if k in fileobj:
335                     del fileobj[k]
336
337         adjustDirObjs(outputObj, rewrite)
338         adjustFileObjs(outputObj, rewrite)
339
340         with final.open("cwl.output.json", "w") as f:
341             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
342
343         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
344
345         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
346                     final.api_response()["name"],
347                     final.manifest_locator())
348
349         final_uuid = final.manifest_locator()
350         tags = tagsString.split(',')
351         for tag in tags:
352              self.api.links().create(body={
353                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
354                 }).execute(num_retries=self.num_retries)
355
356         def finalcollection(fileobj):
357             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
358
359         adjustDirObjs(outputObj, finalcollection)
360         adjustFileObjs(outputObj, finalcollection)
361
362         return (outputObj, final)
363
364     def set_crunch_output(self):
365         if self.work_api == "containers":
366             try:
367                 current = self.api.containers().current().execute(num_retries=self.num_retries)
368             except ApiError as e:
369                 # Status code 404 just means we're not running in a container.
370                 if e.resp.status != 404:
371                     logger.info("Getting current container: %s", e)
372                 return
373             try:
374                 self.api.containers().update(uuid=current['uuid'],
375                                              body={
376                                                  'output': self.final_output_collection.portable_data_hash(),
377                                              }).execute(num_retries=self.num_retries)
378                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
379                                               body={
380                                                   'is_trashed': True
381                                               }).execute(num_retries=self.num_retries)
382             except Exception as e:
383                 logger.info("Setting container output: %s", e)
384         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
385             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
386                                    body={
387                                        'output': self.final_output_collection.portable_data_hash(),
388                                        'success': self.final_status == "success",
389                                        'progress':1.0
390                                    }).execute(num_retries=self.num_retries)
391
392     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
393         self.debug = runtimeContext.debug
394
395         tool.visit(self.check_features)
396
397         self.project_uuid = runtimeContext.project_uuid
398         self.pipeline = None
399         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
400         self.secret_store = runtimeContext.secret_store
401
402         self.trash_intermediate = runtimeContext.trash_intermediate
403         if self.trash_intermediate and self.work_api != "containers":
404             raise Exception("--trash-intermediate is only supported with --api=containers.")
405
406         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
407         if self.intermediate_output_ttl and self.work_api != "containers":
408             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
409         if self.intermediate_output_ttl < 0:
410             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
411
412         if runtimeContext.submit_request_uuid and self.work_api != "containers":
413             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
414
415         if not runtimeContext.name:
416             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
417
418         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
419         # Also uploads docker images.
420         merged_map = upload_workflow_deps(self, tool)
421
422         # Reload tool object which may have been updated by
423         # upload_workflow_deps
424         # Don't validate this time because it will just print redundant errors.
425         loadingContext = self.loadingContext.copy()
426         loadingContext.loader = tool.doc_loader
427         loadingContext.avsc_names = tool.doc_schema
428         loadingContext.metadata = tool.metadata
429         loadingContext.do_validate = False
430
431         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
432                                   loadingContext)
433
434         # Upload local file references in the job order.
435         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
436                                      tool, job_order)
437
438         existing_uuid = runtimeContext.update_workflow
439         if existing_uuid or runtimeContext.create_workflow:
440             # Create a pipeline template or workflow record and exit.
441             if self.work_api == "jobs":
442                 tmpl = RunnerTemplate(self, tool, job_order,
443                                       runtimeContext.enable_reuse,
444                                       uuid=existing_uuid,
445                                       submit_runner_ram=runtimeContext.submit_runner_ram,
446                                       name=runtimeContext.name,
447                                       merged_map=merged_map)
448                 tmpl.save()
449                 # cwltool.main will write our return value to stdout.
450                 return (tmpl.uuid, "success")
451             elif self.work_api == "containers":
452                 return (upload_workflow(self, tool, job_order,
453                                         self.project_uuid,
454                                         uuid=existing_uuid,
455                                         submit_runner_ram=runtimeContext.submit_runner_ram,
456                                         name=runtimeContext.name,
457                                         merged_map=merged_map),
458                         "success")
459
460         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
461         self.eval_timeout = runtimeContext.eval_timeout
462
463         runtimeContext = runtimeContext.copy()
464         runtimeContext.use_container = True
465         runtimeContext.tmpdir_prefix = "tmp"
466         runtimeContext.work_api = self.work_api
467
468         if self.work_api == "containers":
469             if self.ignore_docker_for_reuse:
470                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
471             runtimeContext.outdir = "/var/spool/cwl"
472             runtimeContext.docker_outdir = "/var/spool/cwl"
473             runtimeContext.tmpdir = "/tmp"
474             runtimeContext.docker_tmpdir = "/tmp"
475         elif self.work_api == "jobs":
476             if runtimeContext.priority != DEFAULT_PRIORITY:
477                 raise Exception("--priority not implemented for jobs API.")
478             runtimeContext.outdir = "$(task.outdir)"
479             runtimeContext.docker_outdir = "$(task.outdir)"
480             runtimeContext.tmpdir = "$(task.tmpdir)"
481
482         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
483             raise Exception("--priority must be in the range 1..1000.")
484
485         runnerjob = None
486         if runtimeContext.submit:
487             # Submit a runner job to run the workflow for us.
488             if self.work_api == "containers":
489                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
490                     runtimeContext.runnerjob = tool.tool["id"]
491                     runnerjob = tool.job(job_order,
492                                          self.output_callback,
493                                          runtimeContext).next()
494                 else:
495                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
496                                                 self.output_name,
497                                                 self.output_tags,
498                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
499                                                 name=runtimeContext.name,
500                                                 on_error=runtimeContext.on_error,
501                                                 submit_runner_image=runtimeContext.submit_runner_image,
502                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
503                                                 merged_map=merged_map,
504                                                 priority=runtimeContext.priority,
505                                                 secret_store=self.secret_store)
506             elif self.work_api == "jobs":
507                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
508                                       self.output_name,
509                                       self.output_tags,
510                                       submit_runner_ram=runtimeContext.submit_runner_ram,
511                                       name=runtimeContext.name,
512                                       on_error=runtimeContext.on_error,
513                                       submit_runner_image=runtimeContext.submit_runner_image,
514                                       merged_map=merged_map)
515         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
516             # Create pipeline for local run
517             self.pipeline = self.api.pipeline_instances().create(
518                 body={
519                     "owner_uuid": self.project_uuid,
520                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
521                     "components": {},
522                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
523             logger.info("Pipeline instance %s", self.pipeline["uuid"])
524
525         if runnerjob and not runtimeContext.wait:
526             submitargs = runtimeContext.copy()
527             submitargs.submit = False
528             runnerjob.run(submitargs)
529             return (runnerjob.uuid, "success")
530
531         self.poll_api = arvados.api('v1')
532         self.polling_thread = threading.Thread(target=self.poll_states)
533         self.polling_thread.start()
534
535         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
536
537         if runnerjob:
538             jobiter = iter((runnerjob,))
539         else:
540             if runtimeContext.cwl_runner_job is not None:
541                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
542             jobiter = tool.job(job_order,
543                                self.output_callback,
544                                runtimeContext)
545
546         try:
547             self.workflow_eval_lock.acquire()
548             # Holds the lock while this code runs and releases it when
549             # it is safe to do so in self.workflow_eval_lock.wait(),
550             # at which point on_message can update job state and
551             # process output callbacks.
552
553             loopperf = Perf(metrics, "jobiter")
554             loopperf.__enter__()
555             for runnable in jobiter:
556                 loopperf.__exit__()
557
558                 if self.stop_polling.is_set():
559                     break
560
561                 if self.task_queue.error is not None:
562                     raise self.task_queue.error
563
564                 if runnable:
565                     with Perf(metrics, "run"):
566                         self.start_run(runnable, runtimeContext)
567                 else:
568                     if (self.task_queue.in_flight + len(self.processes)) > 0:
569                         self.workflow_eval_lock.wait(3)
570                     else:
571                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
572                         break
573                 loopperf.__enter__()
574             loopperf.__exit__()
575
576             while (self.task_queue.in_flight + len(self.processes)) > 0:
577                 if self.task_queue.error is not None:
578                     raise self.task_queue.error
579                 self.workflow_eval_lock.wait(3)
580
581         except UnsupportedRequirement:
582             raise
583         except:
584             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
585                 logger.error("Interrupted, workflow will be cancelled")
586             else:
587                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
588             if self.pipeline:
589                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
590                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
591             if runnerjob and runnerjob.uuid and self.work_api == "containers":
592                 self.api.container_requests().update(uuid=runnerjob.uuid,
593                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
594         finally:
595             self.workflow_eval_lock.release()
596             self.task_queue.drain()
597             self.stop_polling.set()
598             self.polling_thread.join()
599             self.task_queue.join()
600
601         if self.final_status == "UnsupportedRequirement":
602             raise UnsupportedRequirement("Check log for details.")
603
604         if self.final_output is None:
605             raise WorkflowException("Workflow did not return a result.")
606
607         if runtimeContext.submit and isinstance(runnerjob, Runner):
608             logger.info("Final output collection %s", runnerjob.final_output)
609         else:
610             if self.output_name is None:
611                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
612             if self.output_tags is None:
613                 self.output_tags = ""
614
615             storage_classes = runtimeContext.storage_classes.strip().split(",")
616             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
617             self.set_crunch_output()
618
619         if runtimeContext.compute_checksum:
620             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
621             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
622
623         if self.trash_intermediate and self.final_status == "success":
624             self.trash_intermediate_output()
625
626         return (self.final_output, self.final_status)
627
628
629 def versionstring():
630     """Print version string of key packages for provenance and debugging."""
631
632     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
633     arvpkg = pkg_resources.require("arvados-python-client")
634     cwlpkg = pkg_resources.require("cwltool")
635
636     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
637                                     "arvados-python-client", arvpkg[0].version,
638                                     "cwltool", cwlpkg[0].version)
639
640
641 def arg_parser():  # type: () -> argparse.ArgumentParser
642     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
643
644     parser.add_argument("--basedir", type=str,
645                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
646     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
647                         help="Output directory, default current directory")
648
649     parser.add_argument("--eval-timeout",
650                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
651                         type=float,
652                         default=20)
653
654     exgroup = parser.add_mutually_exclusive_group()
655     exgroup.add_argument("--print-dot", action="store_true",
656                          help="Print workflow visualization in graphviz format and exit")
657     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
658     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
659
660     exgroup = parser.add_mutually_exclusive_group()
661     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
662     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
663     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
664
665     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
666
667     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
668
669     exgroup = parser.add_mutually_exclusive_group()
670     exgroup.add_argument("--enable-reuse", action="store_true",
671                         default=True, dest="enable_reuse",
672                         help="Enable job or container reuse (default)")
673     exgroup.add_argument("--disable-reuse", action="store_false",
674                         default=True, dest="enable_reuse",
675                         help="Disable job or container reuse")
676
677     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
678     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
679     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
680     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
681                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
682                         default=False)
683
684     exgroup = parser.add_mutually_exclusive_group()
685     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
686                         default=True, dest="submit")
687     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
688                         default=True, dest="submit")
689     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
690                          dest="create_workflow")
691     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
692     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
693
694     exgroup = parser.add_mutually_exclusive_group()
695     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
696                         default=True, dest="wait")
697     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
698                         default=True, dest="wait")
699
700     exgroup = parser.add_mutually_exclusive_group()
701     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
702                         default=True, dest="log_timestamps")
703     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
704                         default=True, dest="log_timestamps")
705
706     parser.add_argument("--api", type=str,
707                         default=None, dest="work_api",
708                         choices=("jobs", "containers"),
709                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
710
711     parser.add_argument("--compute-checksum", action="store_true", default=False,
712                         help="Compute checksum of contents while collecting outputs",
713                         dest="compute_checksum")
714
715     parser.add_argument("--submit-runner-ram", type=int,
716                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
717                         default=None)
718
719     parser.add_argument("--submit-runner-image", type=str,
720                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
721                         default=None)
722
723     parser.add_argument("--submit-request-uuid", type=str,
724                         default=None,
725                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
726
727     parser.add_argument("--name", type=str,
728                         help="Name to use for workflow execution instance.",
729                         default=None)
730
731     parser.add_argument("--on-error", type=str,
732                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
733                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
734
735     parser.add_argument("--enable-dev", action="store_true",
736                         help="Enable loading and running development versions "
737                              "of CWL spec.", default=False)
738     parser.add_argument('--storage-classes', default="default", type=str,
739                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
740
741     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
742                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
743                         default=0)
744
745     parser.add_argument("--priority", type=int,
746                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
747                         default=DEFAULT_PRIORITY)
748
749     parser.add_argument("--disable-validate", dest="do_validate",
750                         action="store_false", default=True,
751                         help=argparse.SUPPRESS)
752
753     parser.add_argument("--disable-js-validation",
754                         action="store_true", default=False,
755                         help=argparse.SUPPRESS)
756
757     parser.add_argument("--thread-count", type=int,
758                         default=4, help="Number of threads to use for job submit and output collection.")
759
760     exgroup = parser.add_mutually_exclusive_group()
761     exgroup.add_argument("--trash-intermediate", action="store_true",
762                         default=False, dest="trash_intermediate",
763                          help="Immediately trash intermediate outputs on workflow success.")
764     exgroup.add_argument("--no-trash-intermediate", action="store_false",
765                         default=False, dest="trash_intermediate",
766                         help="Do not trash intermediate outputs (default).")
767
768     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
769     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
770
771     return parser
772
773 def add_arv_hints():
774     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
775     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
776     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
777     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
778     res.close()
779     cwltool.process.supportedProcessRequirements.extend([
780         "http://arvados.org/cwl#RunInSingleContainer",
781         "http://arvados.org/cwl#OutputDirType",
782         "http://arvados.org/cwl#RuntimeConstraints",
783         "http://arvados.org/cwl#PartitionRequirement",
784         "http://arvados.org/cwl#APIRequirement",
785         "http://commonwl.org/cwltool#LoadListingRequirement",
786         "http://arvados.org/cwl#IntermediateOutput",
787         "http://arvados.org/cwl#ReuseRequirement"
788     ])
789
790 def exit_signal_handler(sigcode, frame):
791     logger.error("Caught signal {}, exiting.".format(sigcode))
792     sys.exit(-sigcode)
793
794 def main(args, stdout, stderr, api_client=None, keep_client=None,
795          install_sig_handlers=True):
796     parser = arg_parser()
797
798     job_order_object = None
799     arvargs = parser.parse_args(args)
800
801     if len(arvargs.storage_classes.strip().split(',')) > 1:
802         logger.error("Multiple storage classes are not supported currently.")
803         return 1
804
805     arvargs.use_container = True
806     arvargs.relax_path_checks = True
807     arvargs.print_supported_versions = False
808
809     if install_sig_handlers:
810         arv_cmd.install_signal_handlers()
811
812     if arvargs.update_workflow:
813         if arvargs.update_workflow.find('-7fd4e-') == 5:
814             want_api = 'containers'
815         elif arvargs.update_workflow.find('-p5p6p-') == 5:
816             want_api = 'jobs'
817         else:
818             want_api = None
819         if want_api and arvargs.work_api and want_api != arvargs.work_api:
820             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
821                 arvargs.update_workflow, want_api, arvargs.work_api))
822             return 1
823         arvargs.work_api = want_api
824
825     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
826         job_order_object = ({}, "")
827
828     add_arv_hints()
829
830     try:
831         if api_client is None:
832             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
833             keep_client = api_client.keep
834             # Make an API object now so errors are reported early.
835             api_client.users().current().execute()
836         if keep_client is None:
837             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
838         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
839     except Exception as e:
840         logger.error(e)
841         return 1
842
843     if arvargs.debug:
844         logger.setLevel(logging.DEBUG)
845         logging.getLogger('arvados').setLevel(logging.DEBUG)
846
847     if arvargs.quiet:
848         logger.setLevel(logging.WARN)
849         logging.getLogger('arvados').setLevel(logging.WARN)
850         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
851
852     if arvargs.metrics:
853         metrics.setLevel(logging.DEBUG)
854         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
855
856     if arvargs.log_timestamps:
857         arvados.log_handler.setFormatter(logging.Formatter(
858             '%(asctime)s %(name)s %(levelname)s: %(message)s',
859             '%Y-%m-%d %H:%M:%S'))
860     else:
861         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
862
863     for key, val in cwltool.argparser.get_default_args().items():
864         if not hasattr(arvargs, key):
865             setattr(arvargs, key, val)
866
867     runtimeContext = ArvRuntimeContext(vars(arvargs))
868     runtimeContext.make_fs_access = partial(CollectionFsAccess,
869                              collection_cache=runner.collection_cache)
870
871     return cwltool.main.main(args=arvargs,
872                              stdout=stdout,
873                              stderr=stderr,
874                              executor=runner.arv_executor,
875                              versionfunc=versionstring,
876                              job_order_object=job_order_object,
877                              logger_handler=arvados.log_handler,
878                              custom_schema_callback=add_arv_hints,
879                              loadingContext=runner.loadingContext,
880                              runtimeContext=runtimeContext)