13497: Merge branch 'master' into 13497-controller
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31
32 import arvados
33 import arvados.config
34 from arvados.keep import KeepClient
35 from arvados.errors import ApiError
36 import arvados.commands._util as arv_cmd
37
38 from .arvcontainer import ArvadosContainer, RunnerContainer
39 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
40 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
41 from .arvtool import ArvadosCommandTool
42 from .arvworkflow import ArvadosWorkflow, upload_workflow
43 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
44 from .perf import Perf
45 from .pathmapper import NoFollowPathMapper
46 from .task_queue import TaskQueue
47 from ._version import __version__
48
49 from cwltool.pack import pack
50 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
51 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
52 from cwltool.command_line_tool import compute_checksums
53 from arvados.api import OrderedJsonModel
54
55 logger = logging.getLogger('arvados.cwl-runner')
56 metrics = logging.getLogger('arvados.cwl-runner.metrics')
57 logger.setLevel(logging.INFO)
58
59 arvados.log_handler.setFormatter(logging.Formatter(
60         '%(asctime)s %(name)s %(levelname)s: %(message)s',
61         '%Y-%m-%d %H:%M:%S'))
62
63 DEFAULT_PRIORITY = 500
64
65 class ArvCwlRunner(object):
66     """Execute a CWL tool or workflow, submit work (using either jobs or
67     containers API), wait for them to complete, and report output.
68
69     """
70
71     def __init__(self, api_client, work_api=None, keep_client=None,
72                  output_name=None, output_tags=None, default_storage_classes="default",
73                  num_retries=4, thread_count=4):
74         self.api = api_client
75         self.processes = {}
76         self.workflow_eval_lock = threading.Condition(threading.RLock())
77         self.final_output = None
78         self.final_status = None
79         self.num_retries = num_retries
80         self.uuid = None
81         self.stop_polling = threading.Event()
82         self.poll_api = None
83         self.pipeline = None
84         self.final_output_collection = None
85         self.output_name = output_name
86         self.output_tags = output_tags
87         self.project_uuid = None
88         self.intermediate_output_ttl = 0
89         self.intermediate_output_collections = []
90         self.trash_intermediate = False
91         self.thread_count = thread_count
92         self.poll_interval = 12
93         self.default_storage_classes = default_storage_classes
94
95         if keep_client is not None:
96             self.keep_client = keep_client
97         else:
98             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
99
100         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
101
102         self.fetcher_constructor = partial(CollectionFetcher,
103                                            api_client=self.api,
104                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
105                                            num_retries=self.num_retries)
106
107         self.work_api = None
108         expected_api = ["jobs", "containers"]
109         for api in expected_api:
110             try:
111                 methods = self.api._rootDesc.get('resources')[api]['methods']
112                 if ('httpMethod' in methods['create'] and
113                     (work_api == api or work_api is None)):
114                     self.work_api = api
115                     break
116             except KeyError:
117                 pass
118
119         if not self.work_api:
120             if work_api is None:
121                 raise Exception("No supported APIs")
122             else:
123                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
124
125     def arv_make_tool(self, toolpath_object, **kwargs):
126         kwargs["work_api"] = self.work_api
127         kwargs["fetcher_constructor"] = self.fetcher_constructor
128         kwargs["resolver"] = partial(collectionResolver, self.api, num_retries=self.num_retries)
129         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
130             return ArvadosCommandTool(self, toolpath_object, **kwargs)
131         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
132             return ArvadosWorkflow(self, toolpath_object, **kwargs)
133         else:
134             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
135
136     def output_callback(self, out, processStatus):
137         with self.workflow_eval_lock:
138             if processStatus == "success":
139                 logger.info("Overall process status is %s", processStatus)
140                 if self.pipeline:
141                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
142                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
143             else:
144                 logger.warn("Overall process status is %s", processStatus)
145                 if self.pipeline:
146                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
147                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
148             self.final_status = processStatus
149             self.final_output = out
150             self.workflow_eval_lock.notifyAll()
151
152
153     def start_run(self, runnable, kwargs):
154         self.task_queue.add(partial(runnable.run, **kwargs))
155
156     def process_submitted(self, container):
157         with self.workflow_eval_lock:
158             self.processes[container.uuid] = container
159
160     def process_done(self, uuid, record):
161         with self.workflow_eval_lock:
162             j = self.processes[uuid]
163             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
164             self.task_queue.add(partial(j.done, record))
165             del self.processes[uuid]
166
167     def wrapped_callback(self, cb, obj, st):
168         with self.workflow_eval_lock:
169             cb(obj, st)
170             self.workflow_eval_lock.notifyAll()
171
172     def get_wrapped_callback(self, cb):
173         return partial(self.wrapped_callback, cb)
174
175     def on_message(self, event):
176         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
177             uuid = event["object_uuid"]
178             if event["properties"]["new_attributes"]["state"] == "Running":
179                 with self.workflow_eval_lock:
180                     j = self.processes[uuid]
181                     if j.running is False:
182                         j.running = True
183                         j.update_pipeline_component(event["properties"]["new_attributes"])
184                         logger.info("%s %s is Running", self.label(j), uuid)
185             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
186                 self.process_done(uuid, event["properties"]["new_attributes"])
187
188     def label(self, obj):
189         return "[%s %s]" % (self.work_api[0:-1], obj.name)
190
191     def poll_states(self):
192         """Poll status of jobs or containers listed in the processes dict.
193
194         Runs in a separate thread.
195         """
196
197         try:
198             remain_wait = self.poll_interval
199             while True:
200                 if remain_wait > 0:
201                     self.stop_polling.wait(remain_wait)
202                 if self.stop_polling.is_set():
203                     break
204                 with self.workflow_eval_lock:
205                     keys = list(self.processes.keys())
206                 if not keys:
207                     remain_wait = self.poll_interval
208                     continue
209
210                 begin_poll = time.time()
211                 if self.work_api == "containers":
212                     table = self.poll_api.container_requests()
213                 elif self.work_api == "jobs":
214                     table = self.poll_api.jobs()
215
216                 try:
217                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
218                 except Exception as e:
219                     logger.warn("Error checking states on API server: %s", e)
220                     remain_wait = self.poll_interval
221                     continue
222
223                 for p in proc_states["items"]:
224                     self.on_message({
225                         "object_uuid": p["uuid"],
226                         "event_type": "update",
227                         "properties": {
228                             "new_attributes": p
229                         }
230                     })
231                 finish_poll = time.time()
232                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
233         except:
234             logger.exception("Fatal error in state polling thread.")
235             with self.workflow_eval_lock:
236                 self.processes.clear()
237                 self.workflow_eval_lock.notifyAll()
238         finally:
239             self.stop_polling.set()
240
241     def add_intermediate_output(self, uuid):
242         if uuid:
243             self.intermediate_output_collections.append(uuid)
244
245     def trash_intermediate_output(self):
246         logger.info("Cleaning up intermediate output collections")
247         for i in self.intermediate_output_collections:
248             try:
249                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
250             except:
251                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
252             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
253                 break
254
255     def check_features(self, obj):
256         if isinstance(obj, dict):
257             if obj.get("writable") and self.work_api != "containers":
258                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
259             if obj.get("class") == "DockerRequirement":
260                 if obj.get("dockerOutputDirectory"):
261                     if self.work_api != "containers":
262                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
263                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
264                     if not obj.get("dockerOutputDirectory").startswith('/'):
265                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
266                             "Option 'dockerOutputDirectory' must be an absolute path.")
267             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
268                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
269             for v in obj.itervalues():
270                 self.check_features(v)
271         elif isinstance(obj, list):
272             for i,v in enumerate(obj):
273                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
274                     self.check_features(v)
275
276     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
277         outputObj = copy.deepcopy(outputObj)
278
279         files = []
280         def capture(fileobj):
281             files.append(fileobj)
282
283         adjustDirObjs(outputObj, capture)
284         adjustFileObjs(outputObj, capture)
285
286         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
287
288         final = arvados.collection.Collection(api_client=self.api,
289                                               keep_client=self.keep_client,
290                                               num_retries=self.num_retries)
291
292         for k,v in generatemapper.items():
293             if k.startswith("_:"):
294                 if v.type == "Directory":
295                     continue
296                 if v.type == "CreateFile":
297                     with final.open(v.target, "wb") as f:
298                         f.write(v.resolved.encode("utf-8"))
299                     continue
300
301             if not k.startswith("keep:"):
302                 raise Exception("Output source is not in keep or a literal")
303             sp = k.split("/")
304             srccollection = sp[0][5:]
305             try:
306                 reader = self.collection_cache.get(srccollection)
307                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
308                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
309             except arvados.errors.ArgumentError as e:
310                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
311                 raise
312             except IOError as e:
313                 logger.warn("While preparing output collection: %s", e)
314
315         def rewrite(fileobj):
316             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
317             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
318                 if k in fileobj:
319                     del fileobj[k]
320
321         adjustDirObjs(outputObj, rewrite)
322         adjustFileObjs(outputObj, rewrite)
323
324         with final.open("cwl.output.json", "w") as f:
325             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
326
327         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
328
329         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
330                     final.api_response()["name"],
331                     final.manifest_locator())
332
333         final_uuid = final.manifest_locator()
334         tags = tagsString.split(',')
335         for tag in tags:
336              self.api.links().create(body={
337                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
338                 }).execute(num_retries=self.num_retries)
339
340         def finalcollection(fileobj):
341             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
342
343         adjustDirObjs(outputObj, finalcollection)
344         adjustFileObjs(outputObj, finalcollection)
345
346         return (outputObj, final)
347
348     def set_crunch_output(self):
349         if self.work_api == "containers":
350             try:
351                 current = self.api.containers().current().execute(num_retries=self.num_retries)
352             except ApiError as e:
353                 # Status code 404 just means we're not running in a container.
354                 if e.resp.status != 404:
355                     logger.info("Getting current container: %s", e)
356                 return
357             try:
358                 self.api.containers().update(uuid=current['uuid'],
359                                              body={
360                                                  'output': self.final_output_collection.portable_data_hash(),
361                                              }).execute(num_retries=self.num_retries)
362                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
363                                               body={
364                                                   'is_trashed': True
365                                               }).execute(num_retries=self.num_retries)
366             except Exception as e:
367                 logger.info("Setting container output: %s", e)
368         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
369             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
370                                    body={
371                                        'output': self.final_output_collection.portable_data_hash(),
372                                        'success': self.final_status == "success",
373                                        'progress':1.0
374                                    }).execute(num_retries=self.num_retries)
375
376     def arv_executor(self, tool, job_order, **kwargs):
377         self.debug = kwargs.get("debug")
378
379         tool.visit(self.check_features)
380
381         self.project_uuid = kwargs.get("project_uuid")
382         self.pipeline = None
383         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
384                                                                  collection_cache=self.collection_cache)
385         self.fs_access = make_fs_access(kwargs["basedir"])
386         self.secret_store = kwargs.get("secret_store")
387
388         self.trash_intermediate = kwargs["trash_intermediate"]
389         if self.trash_intermediate and self.work_api != "containers":
390             raise Exception("--trash-intermediate is only supported with --api=containers.")
391
392         self.intermediate_output_ttl = kwargs["intermediate_output_ttl"]
393         if self.intermediate_output_ttl and self.work_api != "containers":
394             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
395         if self.intermediate_output_ttl < 0:
396             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
397
398         if kwargs.get("submit_request_uuid") and self.work_api != "containers":
399             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
400
401         if not kwargs.get("name"):
402             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
403
404         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
405         # Also uploads docker images.
406         merged_map = upload_workflow_deps(self, tool)
407
408         # Reload tool object which may have been updated by
409         # upload_workflow_deps
410         # Don't validate this time because it will just print redundant errors.
411         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
412                                   makeTool=self.arv_make_tool,
413                                   loader=tool.doc_loader,
414                                   avsc_names=tool.doc_schema,
415                                   metadata=tool.metadata,
416                                   do_validate=False)
417
418         # Upload local file references in the job order.
419         job_order = upload_job_order(self, "%s input" % kwargs["name"],
420                                      tool, job_order)
421
422         existing_uuid = kwargs.get("update_workflow")
423         if existing_uuid or kwargs.get("create_workflow"):
424             # Create a pipeline template or workflow record and exit.
425             if self.work_api == "jobs":
426                 tmpl = RunnerTemplate(self, tool, job_order,
427                                       kwargs.get("enable_reuse"),
428                                       uuid=existing_uuid,
429                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
430                                       name=kwargs["name"],
431                                       merged_map=merged_map)
432                 tmpl.save()
433                 # cwltool.main will write our return value to stdout.
434                 return (tmpl.uuid, "success")
435             elif self.work_api == "containers":
436                 return (upload_workflow(self, tool, job_order,
437                                         self.project_uuid,
438                                         uuid=existing_uuid,
439                                         submit_runner_ram=kwargs.get("submit_runner_ram"),
440                                         name=kwargs["name"],
441                                         merged_map=merged_map),
442                         "success")
443
444         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
445         self.eval_timeout = kwargs.get("eval_timeout")
446
447         kwargs["make_fs_access"] = make_fs_access
448         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
449         kwargs["use_container"] = True
450         kwargs["tmpdir_prefix"] = "tmp"
451         kwargs["compute_checksum"] = kwargs.get("compute_checksum")
452
453         if self.work_api == "containers":
454             if self.ignore_docker_for_reuse:
455                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
456             kwargs["outdir"] = "/var/spool/cwl"
457             kwargs["docker_outdir"] = "/var/spool/cwl"
458             kwargs["tmpdir"] = "/tmp"
459             kwargs["docker_tmpdir"] = "/tmp"
460         elif self.work_api == "jobs":
461             if kwargs["priority"] != DEFAULT_PRIORITY:
462                 raise Exception("--priority not implemented for jobs API.")
463             kwargs["outdir"] = "$(task.outdir)"
464             kwargs["docker_outdir"] = "$(task.outdir)"
465             kwargs["tmpdir"] = "$(task.tmpdir)"
466
467         if kwargs["priority"] < 1 or kwargs["priority"] > 1000:
468             raise Exception("--priority must be in the range 1..1000.")
469
470         runnerjob = None
471         if kwargs.get("submit"):
472             # Submit a runner job to run the workflow for us.
473             if self.work_api == "containers":
474                 if tool.tool["class"] == "CommandLineTool" and kwargs.get("wait"):
475                     kwargs["runnerjob"] = tool.tool["id"]
476                     runnerjob = tool.job(job_order,
477                                          self.output_callback,
478                                          **kwargs).next()
479                 else:
480                     runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"),
481                                                 self.output_name,
482                                                 self.output_tags,
483                                                 submit_runner_ram=kwargs.get("submit_runner_ram"),
484                                                 name=kwargs.get("name"),
485                                                 on_error=kwargs.get("on_error"),
486                                                 submit_runner_image=kwargs.get("submit_runner_image"),
487                                                 intermediate_output_ttl=kwargs.get("intermediate_output_ttl"),
488                                                 merged_map=merged_map,
489                                                 default_storage_classes=self.default_storage_classes,
490                                                 priority=kwargs.get("priority"),
491                                                 secret_store=self.secret_store)
492             elif self.work_api == "jobs":
493                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"),
494                                       self.output_name,
495                                       self.output_tags,
496                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
497                                       name=kwargs.get("name"),
498                                       on_error=kwargs.get("on_error"),
499                                       submit_runner_image=kwargs.get("submit_runner_image"),
500                                       merged_map=merged_map)
501         elif "cwl_runner_job" not in kwargs and self.work_api == "jobs":
502             # Create pipeline for local run
503             self.pipeline = self.api.pipeline_instances().create(
504                 body={
505                     "owner_uuid": self.project_uuid,
506                     "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
507                     "components": {},
508                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
509             logger.info("Pipeline instance %s", self.pipeline["uuid"])
510
511         if runnerjob and not kwargs.get("wait"):
512             submitargs = kwargs.copy()
513             submitargs['submit'] = False
514             runnerjob.run(**submitargs)
515             return (runnerjob.uuid, "success")
516
517         self.poll_api = arvados.api('v1')
518         self.polling_thread = threading.Thread(target=self.poll_states)
519         self.polling_thread.start()
520
521         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
522
523         if runnerjob:
524             jobiter = iter((runnerjob,))
525         else:
526             if "cwl_runner_job" in kwargs:
527                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
528             jobiter = tool.job(job_order,
529                                self.output_callback,
530                                **kwargs)
531
532         try:
533             self.workflow_eval_lock.acquire()
534             # Holds the lock while this code runs and releases it when
535             # it is safe to do so in self.workflow_eval_lock.wait(),
536             # at which point on_message can update job state and
537             # process output callbacks.
538
539             loopperf = Perf(metrics, "jobiter")
540             loopperf.__enter__()
541             for runnable in jobiter:
542                 loopperf.__exit__()
543
544                 if self.stop_polling.is_set():
545                     break
546
547                 if self.task_queue.error is not None:
548                     raise self.task_queue.error
549
550                 if runnable:
551                     with Perf(metrics, "run"):
552                         self.start_run(runnable, kwargs)
553                 else:
554                     if (self.task_queue.in_flight + len(self.processes)) > 0:
555                         self.workflow_eval_lock.wait(3)
556                     else:
557                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
558                         break
559                 loopperf.__enter__()
560             loopperf.__exit__()
561
562             while (self.task_queue.in_flight + len(self.processes)) > 0:
563                 if self.task_queue.error is not None:
564                     raise self.task_queue.error
565                 self.workflow_eval_lock.wait(3)
566
567         except UnsupportedRequirement:
568             raise
569         except:
570             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
571                 logger.error("Interrupted, workflow will be cancelled")
572             else:
573                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
574             if self.pipeline:
575                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
576                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
577             if runnerjob and runnerjob.uuid and self.work_api == "containers":
578                 self.api.container_requests().update(uuid=runnerjob.uuid,
579                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
580         finally:
581             self.workflow_eval_lock.release()
582             self.task_queue.drain()
583             self.stop_polling.set()
584             self.polling_thread.join()
585             self.task_queue.join()
586
587         if self.final_status == "UnsupportedRequirement":
588             raise UnsupportedRequirement("Check log for details.")
589
590         if self.final_output is None:
591             raise WorkflowException("Workflow did not return a result.")
592
593
594         if kwargs.get("submit") and isinstance(runnerjob, Runner):
595             logger.info("Final output collection %s", runnerjob.final_output)
596         else:
597             if self.output_name is None:
598                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
599             if self.output_tags is None:
600                 self.output_tags = ""
601
602             storage_classes = kwargs.get("storage_classes").strip().split(",")
603             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
604             self.set_crunch_output()
605
606         if kwargs.get("compute_checksum"):
607             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
608             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
609
610         if self.trash_intermediate and self.final_status == "success":
611             self.trash_intermediate_output()
612
613         return (self.final_output, self.final_status)
614
615
616 def versionstring():
617     """Print version string of key packages for provenance and debugging."""
618
619     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
620     arvpkg = pkg_resources.require("arvados-python-client")
621     cwlpkg = pkg_resources.require("cwltool")
622
623     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
624                                     "arvados-python-client", arvpkg[0].version,
625                                     "cwltool", cwlpkg[0].version)
626
627
628 def arg_parser():  # type: () -> argparse.ArgumentParser
629     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
630
631     parser.add_argument("--basedir", type=str,
632                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
633     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
634                         help="Output directory, default current directory")
635
636     parser.add_argument("--eval-timeout",
637                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
638                         type=float,
639                         default=20)
640
641     exgroup = parser.add_mutually_exclusive_group()
642     exgroup.add_argument("--print-dot", action="store_true",
643                          help="Print workflow visualization in graphviz format and exit")
644     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
645     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
646
647     exgroup = parser.add_mutually_exclusive_group()
648     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
649     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
650     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
651
652     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
653
654     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
655
656     exgroup = parser.add_mutually_exclusive_group()
657     exgroup.add_argument("--enable-reuse", action="store_true",
658                         default=True, dest="enable_reuse",
659                         help="Enable job or container reuse (default)")
660     exgroup.add_argument("--disable-reuse", action="store_false",
661                         default=True, dest="enable_reuse",
662                         help="Disable job or container reuse")
663
664     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
665     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
666     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
667     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
668                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
669                         default=False)
670
671     exgroup = parser.add_mutually_exclusive_group()
672     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
673                         default=True, dest="submit")
674     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
675                         default=True, dest="submit")
676     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
677                          dest="create_workflow")
678     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
679     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
680
681     exgroup = parser.add_mutually_exclusive_group()
682     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
683                         default=True, dest="wait")
684     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
685                         default=True, dest="wait")
686
687     exgroup = parser.add_mutually_exclusive_group()
688     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
689                         default=True, dest="log_timestamps")
690     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
691                         default=True, dest="log_timestamps")
692
693     parser.add_argument("--api", type=str,
694                         default=None, dest="work_api",
695                         choices=("jobs", "containers"),
696                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
697
698     parser.add_argument("--compute-checksum", action="store_true", default=False,
699                         help="Compute checksum of contents while collecting outputs",
700                         dest="compute_checksum")
701
702     parser.add_argument("--submit-runner-ram", type=int,
703                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
704                         default=1024)
705
706     parser.add_argument("--submit-runner-image", type=str,
707                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
708                         default=None)
709
710     parser.add_argument("--submit-request-uuid", type=str,
711                         default=None,
712                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
713
714     parser.add_argument("--name", type=str,
715                         help="Name to use for workflow execution instance.",
716                         default=None)
717
718     parser.add_argument("--on-error", type=str,
719                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
720                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
721
722     parser.add_argument("--enable-dev", action="store_true",
723                         help="Enable loading and running development versions "
724                              "of CWL spec.", default=False)
725     parser.add_argument('--storage-classes', default="default", type=str,
726                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
727
728     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
729                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
730                         default=0)
731
732     parser.add_argument("--priority", type=int,
733                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
734                         default=DEFAULT_PRIORITY)
735
736     parser.add_argument("--disable-validate", dest="do_validate",
737                         action="store_false", default=True,
738                         help=argparse.SUPPRESS)
739
740     parser.add_argument("--disable-js-validation",
741                         action="store_true", default=False,
742                         help=argparse.SUPPRESS)
743
744     parser.add_argument("--thread-count", type=int,
745                         default=4, help="Number of threads to use for job submit and output collection.")
746
747     exgroup = parser.add_mutually_exclusive_group()
748     exgroup.add_argument("--trash-intermediate", action="store_true",
749                         default=False, dest="trash_intermediate",
750                          help="Immediately trash intermediate outputs on workflow success.")
751     exgroup.add_argument("--no-trash-intermediate", action="store_false",
752                         default=False, dest="trash_intermediate",
753                         help="Do not trash intermediate outputs (default).")
754
755     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
756     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
757
758     return parser
759
760 def add_arv_hints():
761     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
762     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
763     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
764     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
765     res.close()
766     cwltool.process.supportedProcessRequirements.extend([
767         "http://arvados.org/cwl#RunInSingleContainer",
768         "http://arvados.org/cwl#OutputDirType",
769         "http://arvados.org/cwl#RuntimeConstraints",
770         "http://arvados.org/cwl#PartitionRequirement",
771         "http://arvados.org/cwl#APIRequirement",
772         "http://commonwl.org/cwltool#LoadListingRequirement",
773         "http://arvados.org/cwl#IntermediateOutput",
774         "http://arvados.org/cwl#ReuseRequirement"
775     ])
776
777 def exit_signal_handler(sigcode, frame):
778     logger.error("Caught signal {}, exiting.".format(sigcode))
779     sys.exit(-sigcode)
780
781 def main(args, stdout, stderr, api_client=None, keep_client=None,
782          install_sig_handlers=True):
783     parser = arg_parser()
784
785     job_order_object = None
786     arvargs = parser.parse_args(args)
787
788     if len(arvargs.storage_classes.strip().split(',')) > 1:
789         logger.error("Multiple storage classes are not supported currently.")
790         return 1
791
792     if install_sig_handlers:
793         arv_cmd.install_signal_handlers()
794
795     if arvargs.update_workflow:
796         if arvargs.update_workflow.find('-7fd4e-') == 5:
797             want_api = 'containers'
798         elif arvargs.update_workflow.find('-p5p6p-') == 5:
799             want_api = 'jobs'
800         else:
801             want_api = None
802         if want_api and arvargs.work_api and want_api != arvargs.work_api:
803             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
804                 arvargs.update_workflow, want_api, arvargs.work_api))
805             return 1
806         arvargs.work_api = want_api
807
808     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
809         job_order_object = ({}, "")
810
811     add_arv_hints()
812
813     try:
814         if api_client is None:
815             api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
816             keep_client = api_client.keep
817         if keep_client is None:
818             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
819         runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
820                               num_retries=4, output_name=arvargs.output_name,
821                               output_tags=arvargs.output_tags, default_storage_classes=parser.get_default("storage_classes"),
822                               thread_count=arvargs.thread_count)
823     except Exception as e:
824         logger.error(e)
825         return 1
826
827     if arvargs.debug:
828         logger.setLevel(logging.DEBUG)
829         logging.getLogger('arvados').setLevel(logging.DEBUG)
830
831     if arvargs.quiet:
832         logger.setLevel(logging.WARN)
833         logging.getLogger('arvados').setLevel(logging.WARN)
834         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
835
836     if arvargs.metrics:
837         metrics.setLevel(logging.DEBUG)
838         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
839
840     if arvargs.log_timestamps:
841         arvados.log_handler.setFormatter(logging.Formatter(
842             '%(asctime)s %(name)s %(levelname)s: %(message)s',
843             '%Y-%m-%d %H:%M:%S'))
844     else:
845         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
846
847     arvargs.conformance_test = None
848     arvargs.use_container = True
849     arvargs.relax_path_checks = True
850     arvargs.print_supported_versions = False
851
852     make_fs_access = partial(CollectionFsAccess,
853                            collection_cache=runner.collection_cache)
854
855     return cwltool.main.main(args=arvargs,
856                              stdout=stdout,
857                              stderr=stderr,
858                              executor=runner.arv_executor,
859                              makeTool=runner.arv_make_tool,
860                              versionfunc=versionstring,
861                              job_order_object=job_order_object,
862                              make_fs_access=make_fs_access,
863                              fetcher_constructor=partial(CollectionFetcher,
864                                                          api_client=api_client,
865                                                          fs_access=make_fs_access(""),
866                                                          num_retries=runner.num_retries),
867                              resolver=partial(collectionResolver, api_client, num_retries=runner.num_retries),
868                              logger_handler=arvados.log_handler,
869                              custom_schema_callback=add_arv_hints)