Change multiple storage classes error handling in cwl_runner
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31
32 import arvados
33 import arvados.config
34 from arvados.keep import KeepClient
35 from arvados.errors import ApiError
36 import arvados.commands._util as arv_cmd
37
38 from .arvcontainer import ArvadosContainer, RunnerContainer
39 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
40 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
41 from .arvtool import ArvadosCommandTool
42 from .arvworkflow import ArvadosWorkflow, upload_workflow
43 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
44 from .perf import Perf
45 from .pathmapper import NoFollowPathMapper
46 from .task_queue import TaskQueue
47 from ._version import __version__
48
49 from cwltool.pack import pack
50 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
51 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
52 from cwltool.command_line_tool import compute_checksums
53 from arvados.api import OrderedJsonModel
54
55 logger = logging.getLogger('arvados.cwl-runner')
56 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 logger.setLevel(logging.INFO)
58
59 arvados.log_handler.setFormatter(logging.Formatter(
60         '%(asctime)s %(name)s %(levelname)s: %(message)s',
61         '%Y-%m-%d %H:%M:%S'))
62
63 DEFAULT_PRIORITY = 500
64
65 class ArvCwlRunner(object):
66     """Execute a CWL tool or workflow, submit work (using either jobs or
67     containers API), wait for them to complete, and report output.
68
69     """
70
71     def __init__(self, api_client, work_api=None, keep_client=None,
72                  output_name=None, output_tags=None, num_retries=4,
73                  thread_count=4):
74         self.api = api_client
75         self.processes = {}
76         self.workflow_eval_lock = threading.Condition(threading.RLock())
77         self.final_output = None
78         self.final_status = None
79         self.uploaded = {}
80         self.num_retries = num_retries
81         self.uuid = None
82         self.stop_polling = threading.Event()
83         self.poll_api = None
84         self.pipeline = None
85         self.final_output_collection = None
86         self.output_name = output_name
87         self.output_tags = output_tags
88         self.project_uuid = None
89         self.intermediate_output_ttl = 0
90         self.intermediate_output_collections = []
91         self.trash_intermediate = False
92         self.thread_count = thread_count
93         self.poll_interval = 12
94
95         if keep_client is not None:
96             self.keep_client = keep_client
97         else:
98             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
99
100         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
101
102         self.fetcher_constructor = partial(CollectionFetcher,
103                                            api_client=self.api,
104                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
105                                            num_retries=self.num_retries)
106
107         self.work_api = None
108         expected_api = ["jobs", "containers"]
109         for api in expected_api:
110             try:
111                 methods = self.api._rootDesc.get('resources')[api]['methods']
112                 if ('httpMethod' in methods['create'] and
113                     (work_api == api or work_api is None)):
114                     self.work_api = api
115                     break
116             except KeyError:
117                 pass
118
119         if not self.work_api:
120             if work_api is None:
121                 raise Exception("No supported APIs")
122             else:
123                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
124
125     def arv_make_tool(self, toolpath_object, **kwargs):
126         kwargs["work_api"] = self.work_api
127         kwargs["fetcher_constructor"] = self.fetcher_constructor
128         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
129         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
130             return ArvadosCommandTool(self, toolpath_object, **kwargs)
131         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
132             return ArvadosWorkflow(self, toolpath_object, **kwargs)
133         else:
134             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
135
136     def output_callback(self, out, processStatus):
137         with self.workflow_eval_lock:
138             if processStatus == "success":
139                 logger.info("Overall process status is %s", processStatus)
140                 if self.pipeline:
141                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
142                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
143             else:
144                 logger.warn("Overall process status is %s", processStatus)
145                 if self.pipeline:
146                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
147                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
148             self.final_status = processStatus
149             self.final_output = out
150             self.workflow_eval_lock.notifyAll()
151
152
153     def start_run(self, runnable, kwargs):
154         self.task_queue.add(partial(runnable.run, **kwargs))
155
156     def process_submitted(self, container):
157         with self.workflow_eval_lock:
158             self.processes[container.uuid] = container
159
160     def process_done(self, uuid, record):
161         with self.workflow_eval_lock:
162             j = self.processes[uuid]
163             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
164             self.task_queue.add(partial(j.done, record))
165             del self.processes[uuid]
166
167     def wrapped_callback(self, cb, obj, st):
168         with self.workflow_eval_lock:
169             cb(obj, st)
170             self.workflow_eval_lock.notifyAll()
171
172     def get_wrapped_callback(self, cb):
173         return partial(self.wrapped_callback, cb)
174
175     def on_message(self, event):
176         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
177             uuid = event["object_uuid"]
178             if event["properties"]["new_attributes"]["state"] == "Running":
179                 with self.workflow_eval_lock:
180                     j = self.processes[uuid]
181                     if j.running is False:
182                         j.running = True
183                         j.update_pipeline_component(event["properties"]["new_attributes"])
184                         logger.info("%s %s is Running", self.label(j), uuid)
185             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
186                 self.process_done(uuid, event["properties"]["new_attributes"])
187
188     def label(self, obj):
189         return "[%s %s]" % (self.work_api[0:-1], obj.name)
190
191     def poll_states(self):
192         """Poll status of jobs or containers listed in the processes dict.
193
194         Runs in a separate thread.
195         """
196
197         try:
198             remain_wait = self.poll_interval
199             while True:
200                 if remain_wait > 0:
201                     self.stop_polling.wait(remain_wait)
202                 if self.stop_polling.is_set():
203                     break
204                 with self.workflow_eval_lock:
205                     keys = list(self.processes.keys())
206                 if not keys:
207                     remain_wait = self.poll_interval
208                     continue
209
210                 begin_poll = time.time()
211                 if self.work_api == "containers":
212                     table = self.poll_api.container_requests()
213                 elif self.work_api == "jobs":
214                     table = self.poll_api.jobs()
215
216                 try:
217                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
218                 except Exception as e:
219                     logger.warn("Error checking states on API server: %s", e)
220                     remain_wait = self.poll_interval
221                     continue
222
223                 for p in proc_states["items"]:
224                     self.on_message({
225                         "object_uuid": p["uuid"],
226                         "event_type": "update",
227                         "properties": {
228                             "new_attributes": p
229                         }
230                     })
231                 finish_poll = time.time()
232                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
233         except:
234             logger.exception("Fatal error in state polling thread.")
235             with self.workflow_eval_lock:
236                 self.processes.clear()
237                 self.workflow_eval_lock.notifyAll()
238         finally:
239             self.stop_polling.set()
240
241     def get_uploaded(self):
242         return self.uploaded.copy()
243
244     def add_uploaded(self, src, pair):
245         self.uploaded[src] = pair
246
247     def add_intermediate_output(self, uuid):
248         if uuid:
249             self.intermediate_output_collections.append(uuid)
250
251     def trash_intermediate_output(self):
252         logger.info("Cleaning up intermediate output collections")
253         for i in self.intermediate_output_collections:
254             try:
255                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
256             except:
257                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
258             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
259                 break
260
261     def check_features(self, obj):
262         if isinstance(obj, dict):
263             if obj.get("writable") and self.work_api != "containers":
264                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
265             if obj.get("class") == "DockerRequirement":
266                 if obj.get("dockerOutputDirectory"):
267                     if self.work_api != "containers":
268                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
269                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
270                     if not obj.get("dockerOutputDirectory").startswith('/'):
271                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
272                             "Option 'dockerOutputDirectory' must be an absolute path.")
273             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
274                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
275             for v in obj.itervalues():
276                 self.check_features(v)
277         elif isinstance(obj, list):
278             for i,v in enumerate(obj):
279                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
280                     self.check_features(v)
281
282     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
283         outputObj = copy.deepcopy(outputObj)
284
285         files = []
286         def capture(fileobj):
287             files.append(fileobj)
288
289         adjustDirObjs(outputObj, capture)
290         adjustFileObjs(outputObj, capture)
291
292         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
293
294         final = arvados.collection.Collection(api_client=self.api,
295                                               keep_client=self.keep_client,
296                                               num_retries=self.num_retries)
297
298         for k,v in generatemapper.items():
299             if k.startswith("_:"):
300                 if v.type == "Directory":
301                     continue
302                 if v.type == "CreateFile":
303                     with final.open(v.target, "wb") as f:
304                         f.write(v.resolved.encode("utf-8"))
305                     continue
306
307             if not k.startswith("keep:"):
308                 raise Exception("Output source is not in keep or a literal")
309             sp = k.split("/")
310             srccollection = sp[0][5:]
311             try:
312                 reader = self.collection_cache.get(srccollection)
313                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
314                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
315             except arvados.errors.ArgumentError as e:
316                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
317                 raise
318             except IOError as e:
319                 logger.warn("While preparing output collection: %s", e)
320
321         def rewrite(fileobj):
322             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
323             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
324                 if k in fileobj:
325                     del fileobj[k]
326
327         adjustDirObjs(outputObj, rewrite)
328         adjustFileObjs(outputObj, rewrite)
329
330         with final.open("cwl.output.json", "w") as f:
331             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
332
333         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
334
335         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
336                     final.api_response()["name"],
337                     final.manifest_locator())
338
339         final_uuid = final.manifest_locator()
340         tags = tagsString.split(',')
341         for tag in tags:
342              self.api.links().create(body={
343                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
344                 }).execute(num_retries=self.num_retries)
345
346         def finalcollection(fileobj):
347             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
348
349         adjustDirObjs(outputObj, finalcollection)
350         adjustFileObjs(outputObj, finalcollection)
351
352         return (outputObj, final)
353
354     def set_crunch_output(self):
355         if self.work_api == "containers":
356             try:
357                 current = self.api.containers().current().execute(num_retries=self.num_retries)
358             except ApiError as e:
359                 # Status code 404 just means we're not running in a container.
360                 if e.resp.status != 404:
361                     logger.info("Getting current container: %s", e)
362                 return
363             try:
364                 self.api.containers().update(uuid=current['uuid'],
365                                              body={
366                                                  'output': self.final_output_collection.portable_data_hash(),
367                                              }).execute(num_retries=self.num_retries)
368                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
369                                               body={
370                                                   'is_trashed': True
371                                               }).execute(num_retries=self.num_retries)
372             except Exception as e:
373                 logger.info("Setting container output: %s", e)
374         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
375             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
376                                    body={
377                                        'output': self.final_output_collection.portable_data_hash(),
378                                        'success': self.final_status == "success",
379                                        'progress':1.0
380                                    }).execute(num_retries=self.num_retries)
381
382     def arv_executor(self, tool, job_order, **kwargs):
383         self.debug = kwargs.get("debug")
384
385         tool.visit(self.check_features)
386
387         self.project_uuid = kwargs.get("project_uuid")
388         self.pipeline = None
389         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
390                                                                  collection_cache=self.collection_cache)
391         self.fs_access = make_fs_access(kwargs["basedir"])
392         self.secret_store = kwargs.get("secret_store")
393
394         self.trash_intermediate = kwargs["trash_intermediate"]
395         if self.trash_intermediate and self.work_api != "containers":
396             raise Exception("--trash-intermediate is only supported with --api=containers.")
397
398         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
399         if self.intermediate_output_ttl and self.work_api != "containers":
400             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
401         if self.intermediate_output_ttl < 0:
402             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
403
404         if not kwargs.get("name"):
405             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
406
407         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
408         # Also uploads docker images.
409         merged_map = upload_workflow_deps(self, tool)
410
411         # Reload tool object which may have been updated by
412         # upload_workflow_deps
413         # Don't validate this time because it will just print redundant errors.
414         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
415                                   makeTool=self.arv_make_tool,
416                                   loader=tool.doc_loader,
417                                   avsc_names=tool.doc_schema,
418                                   metadata=tool.metadata,
419                                   do_validate=False)
420
421         # Upload local file references in the job order.
422         job_order = upload_job_order(self, "%s input" % kwargs["name"],
423                                      tool, job_order)
424
425         existing_uuid = kwargs.get("update_workflow")
426         if existing_uuid or kwargs.get("create_workflow"):
427             # Create a pipeline template or workflow record and exit.
428             if self.work_api == "jobs":
429                 tmpl = RunnerTemplate(self, tool, job_order,
430                                       kwargs.get("enable_reuse"),
431                                       uuid=existing_uuid,
432                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
433                                       name=kwargs["name"],
434                                       merged_map=merged_map)
435                 tmpl.save()
436                 # cwltool.main will write our return value to stdout.
437                 return (tmpl.uuid, "success")
438             elif self.work_api == "containers":
439                 return (upload_workflow(self, tool, job_order,
440                                         self.project_uuid,
441                                         uuid=existing_uuid,
442                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
443                                         name=kwargs["name"],
444                                         merged_map=merged_map),
445                         "success")
446
447         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
448         self.eval_timeout = kwargs.get("eval_timeout")
449
450         kwargs["make_fs_access"] = make_fs_access
451         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
452         kwargs["use_container"] = True
453         kwargs["tmpdir_prefix"] = "tmp"
454         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
455
456         if self.work_api == "containers":
457             if self.ignore_docker_for_reuse:
458                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
459             kwargs["outdir"] = "/var/spool/cwl"
460             kwargs["docker_outdir"] = "/var/spool/cwl"
461             kwargs["tmpdir"] = "/tmp"
462             kwargs["docker_tmpdir"] = "/tmp"
463         elif self.work_api == "jobs":
464             if kwargs["priority"] != DEFAULT_PRIORITY:
465                 raise Exception("--priority not implemented for jobs API.")
466             kwargs["outdir"] = "$(task.outdir)"
467             kwargs["docker_outdir"] = "$(task.outdir)"
468             kwargs["tmpdir"] = "$(task.tmpdir)"
469
470         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
471             raise Exception("--priority must be in the range 1..1000.")
472
473         runnerjob = None
474         if kwargs.get("submit"):
475             # Submit a runner job to run the workflow for us.
476             if self.work_api == "containers":
477                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
478                     kwargs["runnerjob"] = tool.tool["id"]
479                     runnerjob = tool.job(job_order,
480                                          self.output_callback,
481                                          **kwargs).next()
482                 else:
483                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
484                                                 self.output_name,
485                                                 self.output_tags,
486                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
487                                                 name=kwargs.get("name"),
488                                                 on_error=kwargs.get("on_error"),
489                                                 submit_runner_image=kwargs.get("submit_runner_image"),
490                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
491                                                 merged_map=merged_map,
492                                                 priority=kwargs.get("priority"),
493                                                 secret_store=self.secret_store)
494             elif self.work_api == "jobs":
495                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
496                                       self.output_name,
497                                       self.output_tags,
498                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
499                                       name=kwargs.get("name"),
500                                       on_error=kwargs.get("on_error"),
501                                       submit_runner_image=kwargs.get("submit_runner_image"),
502                                       merged_map=merged_map)
503         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
504             # Create pipeline for local run
505             self.pipeline = self.api.pipeline_instances().create(
506                 body={
507                     "owner_uuid": self.project_uuid,
508                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
509                     "components": {},
510                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
511             logger.info("Pipeline instance %s", self.pipeline["uuid"])
512
513         if runnerjob and not kwargs.get("wait"):
514             submitargs = kwargs.copy()
515             submitargs['submit'] = False
516             runnerjob.run(**submitargs)
517             return (runnerjob.uuid, "success")
518
519         self.poll_api = arvados.api('v1')
520         self.polling_thread = threading.Thread(target=self.poll_states)
521         self.polling_thread.start()
522
523         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
524
525         if runnerjob:
526             jobiter = iter((runnerjob,))
527         else:
528             if "cwl_runner_job" in kwargs:
529                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
530             jobiter = tool.job(job_order,
531                                self.output_callback,
532                                **kwargs)
533
534         try:
535             self.workflow_eval_lock.acquire()
536             # Holds the lock while this code runs and releases it when
537             # it is safe to do so in self.workflow_eval_lock.wait(),
538             # at which point on_message can update job state and
539             # process output callbacks.
540
541             loopperf = Perf(metrics, "jobiter")
542             loopperf.__enter__()
543             for runnable in jobiter:
544                 loopperf.__exit__()
545
546                 if self.stop_polling.is_set():
547                     break
548
549                 if self.task_queue.error is not None:
550                     raise self.task_queue.error
551
552                 if runnable:
553                     with Perf(metrics, "run"):
554                         self.start_run(runnable, kwargs)
555                 else:
556                     if (self.task_queue.in_flight + len(self.processes)) > 0:
557                         self.workflow_eval_lock.wait(3)
558                     else:
559                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
560                         break
561                 loopperf.__enter__()
562             loopperf.__exit__()
563
564             while (self.task_queue.in_flight + len(self.processes)) > 0:
565                 if self.task_queue.error is not None:
566                     raise self.task_queue.error
567                 self.workflow_eval_lock.wait(3)
568
569         except UnsupportedRequirement:
570             raise
571         except:
572             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
573                 logger.error("Interrupted, workflow will be cancelled")
574             else:
575                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
576             if self.pipeline:
577                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
578                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
579             if runnerjob and runnerjob.uuid and self.work_api == "containers":
580                 self.api.container_requests().update(uuid=runnerjob.uuid,
581                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
582         finally:
583             self.workflow_eval_lock.release()
584             self.task_queue.drain()
585             self.stop_polling.set()
586             self.polling_thread.join()
587             self.task_queue.join()
588
589         if self.final_status == "UnsupportedRequirement":
590             raise UnsupportedRequirement("Check log for details.")
591
592         if self.final_output is None:
593             raise WorkflowException("Workflow did not return a result.")
594
595
596         if kwargs.get("submit") and isinstance(runnerjob, Runner):
597             logger.info("Final output collection %s", runnerjob.final_output)
598         else:
599             if self.output_name is None:
600                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
601             if self.output_tags is None:
602                 self.output_tags = ""
603
604             storage_classes = kwargs.get("storage_classes")
605             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
606             self.set_crunch_output()
607
608         if kwargs.get("compute_checksum"):
609             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
610             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
611
612         if self.trash_intermediate and self.final_status == "success":
613             self.trash_intermediate_output()
614
615         return (self.final_output, self.final_status)
616
617
618 def versionstring():
619     """Print version string of key packages for provenance and debugging."""
620
621     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
622     arvpkg = pkg_resources.require("arvados-python-client")
623     cwlpkg = pkg_resources.require("cwltool")
624
625     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
626                                     "arvados-python-client", arvpkg[0].version,
627                                     "cwltool", cwlpkg[0].version)
628
629
630 def arg_parser():  # type: () -> argparse.ArgumentParser
631     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
632
633     parser.add_argument("--basedir", type=str,
634                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
635     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
636                         help="Output directory, default current directory")
637
638     parser.add_argument("--eval-timeout",
639                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
640                         type=float,
641                         default=20)
642
643     exgroup = parser.add_mutually_exclusive_group()
644     exgroup.add_argument("--print-dot", action="store_true",
645                          help="Print workflow visualization in graphviz format and exit")
646     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
647     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
648
649     exgroup = parser.add_mutually_exclusive_group()
650     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
651     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
652     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
653
654     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
655
656     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
657
658     exgroup = parser.add_mutually_exclusive_group()
659     exgroup.add_argument("--enable-reuse", action="store_true",
660                         default=True, dest="enable_reuse",
661                         help="Enable job or container reuse (default)")
662     exgroup.add_argument("--disable-reuse", action="store_false",
663                         default=True, dest="enable_reuse",
664                         help="Disable job or container reuse")
665
666     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
667     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
668     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
669     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
670                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
671                         default=False)
672
673     exgroup = parser.add_mutually_exclusive_group()
674     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
675                         default=True, dest="submit")
676     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
677                         default=True, dest="submit")
678     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
679                          dest="create_workflow")
680     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
681     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
682
683     exgroup = parser.add_mutually_exclusive_group()
684     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
685                         default=True, dest="wait")
686     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
687                         default=True, dest="wait")
688
689     exgroup = parser.add_mutually_exclusive_group()
690     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
691                         default=True, dest="log_timestamps")
692     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
693                         default=True, dest="log_timestamps")
694
695     parser.add_argument("--api", type=str,
696                         default=None, dest="work_api",
697                         choices=("jobs", "containers"),
698                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
699
700     parser.add_argument("--compute-checksum", action="store_true", default=False,
701                         help="Compute checksum of contents while collecting outputs",
702                         dest="compute_checksum")
703
704     parser.add_argument("--submit-runner-ram", type=int,
705                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
706                         default=1024)
707
708     parser.add_argument("--submit-runner-image", type=str,
709                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
710                         default=None)
711
712     parser.add_argument("--name", type=str,
713                         help="Name to use for workflow execution instance.",
714                         default=None)
715
716     parser.add_argument("--on-error", type=str,
717                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
718                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
719
720     parser.add_argument("--enable-dev", action="store_true",
721                         help="Enable loading and running development versions "
722                              "of CWL spec.", default=False)
723     parser.add_argument('--storage-classes', default="default",
724                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
725
726     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
727                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
728                         default=0)
729
730     parser.add_argument("--priority", type=int,
731                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
732                         default=DEFAULT_PRIORITY)
733
734     parser.add_argument("--disable-validate", dest="do_validate",
735                         action="store_false", default=True,
736                         help=argparse.SUPPRESS)
737
738     parser.add_argument("--disable-js-validation",
739                         action="store_true", default=False,
740                         help=argparse.SUPPRESS)
741
742     parser.add_argument("--thread-count", type=int,
743                         default=4, help="Number of threads to use for job submit and output collection.")
744
745     exgroup = parser.add_mutually_exclusive_group()
746     exgroup.add_argument("--trash-intermediate", action="store_true",
747                         default=False, dest="trash_intermediate",
748                          help="Immediately trash intermediate outputs on workflow success.")
749     exgroup.add_argument("--no-trash-intermediate", action="store_false",
750                         default=False, dest="trash_intermediate",
751                         help="Do not trash intermediate outputs (default).")
752
753     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
754     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
755
756     return parser
757
758 def add_arv_hints():
759     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
760     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
761     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
762     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
763     res.close()
764     cwltool.process.supportedProcessRequirements.extend([
765         "http://arvados.org/cwl#RunInSingleContainer",
766         "http://arvados.org/cwl#OutputDirType",
767         "http://arvados.org/cwl#RuntimeConstraints",
768         "http://arvados.org/cwl#PartitionRequirement",
769         "http://arvados.org/cwl#APIRequirement",
770         "http://commonwl.org/cwltool#LoadListingRequirement",
771         "http://arvados.org/cwl#IntermediateOutput",
772         "http://arvados.org/cwl#ReuseRequirement"
773     ])
774
775 def exit_signal_handler(sigcode, frame):
776     logger.error("Caught signal {}, exiting.".format(sigcode))
777     sys.exit(-sigcode)
778
779 def main(args, stdout, stderr, api_client=None, keep_client=None,
780          install_sig_handlers=True):
781     parser = arg_parser()
782
783     job_order_object = None
784     arvargs = parser.parse_args(args)
785
786     arvargs.storage_classes = arvargs.storage_classes.strip().split(',')
787     if len(arvargs.storage_classes) > 1:
788         logger.error("Multiple storage classes are not supported currently.")
789         return 1
790
791     if install_sig_handlers:
792         arv_cmd.install_signal_handlers()
793
794     if arvargs.update_workflow:
795         if arvargs.update_workflow.find('-7fd4e-') == 5:
796             want_api = 'containers'
797         elif arvargs.update_workflow.find('-p5p6p-') == 5:
798             want_api = 'jobs'
799         else:
800             want_api = None
801         if want_api and arvargs.work_api and want_api != arvargs.work_api:
802             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
803                 arvargs.update_workflow, want_api, arvargs.work_api))
804             return 1
805         arvargs.work_api = want_api
806
807     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
808         job_order_object = ({}, "")
809
810     add_arv_hints()
811
812     try:
813         if api_client is None:
814             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
815             keep_client = api_client.keep
816         if keep_client is None:
817             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
818         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
819                               num_retries=4, output_name=arvargs.output_name,
820                               output_tags=arvargs.output_tags,
821                               thread_count=arvargs.thread_count)
822     except Exception as e:
823         logger.error(e)
824         return 1
825
826     if arvargs.debug:
827         logger.setLevel(logging.DEBUG)
828         logging.getLogger('arvados').setLevel(logging.DEBUG)
829
830     if arvargs.quiet:
831         logger.setLevel(logging.WARN)
832         logging.getLogger('arvados').setLevel(logging.WARN)
833         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
834
835     if arvargs.metrics:
836         metrics.setLevel(logging.DEBUG)
837         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
838
839     if arvargs.log_timestamps:
840         arvados.log_handler.setFormatter(logging.Formatter(
841             '%(asctime)s %(name)s %(levelname)s: %(message)s',
842             '%Y-%m-%d %H:%M:%S'))
843     else:
844         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
845
846     arvargs.conformance_test = None
847     arvargs.use_container = True
848     arvargs.relax_path_checks = True
849     arvargs.print_supported_versions = False
850
851     make_fs_access = partial(CollectionFsAccess,
852                            collection_cache=runner.collection_cache)
853
854     return cwltool.main.main(args=arvargs,
855                              stdout=stdout,
856                              stderr=stderr,
857                              executor=runner.arv_executor,
858                              makeTool=runner.arv_make_tool,
859                              versionfunc=versionstring,
860                              job_order_object=job_order_object,
861                              make_fs_access=make_fs_access,
862                              fetcher_constructor=partial(CollectionFetcher,
863                                                          api_client=api_client,
864                                                          fs_access=make_fs_access(""),
865                                                          num_retries=runner.num_retries),
866                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
867                              logger_handler=arvados.log_handler,
868                              custom_schema_callback=add_arv_hints)