Merge branch 'wtsi/python-api-timeout' refs #13542
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Implement cwl-runner interface for submitting and running work on Arvados, using
7 # either the Crunch jobs API or Crunch containers API.
8
9 import argparse
10 import logging
11 import os
12 import sys
13 import threading
14 import hashlib
15 import copy
16 import json
17 import re
18 from functools import partial
19 import pkg_resources  # part of setuptools
20 import Queue
21 import time
22 import signal
23 import thread
24
25 from cwltool.errors import WorkflowException
26 import cwltool.main
27 import cwltool.workflow
28 import cwltool.process
29 from schema_salad.sourceline import SourceLine
30 import schema_salad.validate as validate
31 import cwltool.argparser
32
33 import arvados
34 import arvados.config
35 from arvados.keep import KeepClient
36 from arvados.errors import ApiError
37 import arvados.commands._util as arv_cmd
38
39 from .arvcontainer import ArvadosContainer, RunnerContainer
40 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
41 from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
42 from .arvtool import ArvadosCommandTool
43 from .arvworkflow import ArvadosWorkflow, upload_workflow
44 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
45 from .perf import Perf
46 from .pathmapper import NoFollowPathMapper
47 from .task_queue import TaskQueue
48 from .context import ArvLoadingContext, ArvRuntimeContext
49 from ._version import __version__
50
51 from cwltool.pack import pack
52 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
53 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing
54 from cwltool.command_line_tool import compute_checksums
55
56 from arvados.api import OrderedJsonModel
57
58 logger = logging.getLogger('arvados.cwl-runner')
59 metrics = logging.getLogger('arvados.cwl-runner.metrics')
60 logger.setLevel(logging.INFO)
61
62 arvados.log_handler.setFormatter(logging.Formatter(
63         '%(asctime)s %(name)s %(levelname)s: %(message)s',
64         '%Y-%m-%d %H:%M:%S'))
65
66 DEFAULT_PRIORITY = 500
67
68 class ArvCwlRunner(object):
69     """Execute a CWL tool or workflow, submit work (using either jobs or
70     containers API), wait for them to complete, and report output.
71
72     """
73
74     def __init__(self, api_client,
75                  arvargs=None,
76                  keep_client=None,
77                  num_retries=4,
78                  thread_count=4):
79
80         if arvargs is None:
81             arvargs = argparse.Namespace()
82             arvargs.work_api = None
83             arvargs.output_name = None
84             arvargs.output_tags = None
85             arvargs.thread_count = 1
86
87         self.api = api_client
88         self.processes = {}
89         self.workflow_eval_lock = threading.Condition(threading.RLock())
90         self.final_output = None
91         self.final_status = None
92         self.num_retries = num_retries
93         self.uuid = None
94         self.stop_polling = threading.Event()
95         self.poll_api = None
96         self.pipeline = None
97         self.final_output_collection = None
98         self.output_name = arvargs.output_name
99         self.output_tags = arvargs.output_tags
100         self.project_uuid = None
101         self.intermediate_output_ttl = 0
102         self.intermediate_output_collections = []
103         self.trash_intermediate = False
104         self.thread_count = arvargs.thread_count
105         self.poll_interval = 12
106         self.loadingContext = None
107
108         if keep_client is not None:
109             self.keep_client = keep_client
110         else:
111             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
112
113         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries)
114
115         self.fetcher_constructor = partial(CollectionFetcher,
116                                            api_client=self.api,
117                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
118                                            num_retries=self.num_retries)
119
120         self.work_api = None
121         expected_api = ["jobs", "containers"]
122         for api in expected_api:
123             try:
124                 methods = self.api._rootDesc.get('resources')[api]['methods']
125                 if ('httpMethod' in methods['create'] and
126                     (arvargs.work_api == api or arvargs.work_api is None)):
127                     self.work_api = api
128                     break
129             except KeyError:
130                 pass
131
132         if not self.work_api:
133             if arvargs.work_api is None:
134                 raise Exception("No supported APIs")
135             else:
136                 raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
137
138         if self.work_api == "jobs":
139             logger.warn("""
140 *******************************
141 Using the deprecated 'jobs' API.
142
143 To get rid of this warning:
144
145 Users: read about migrating at
146 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
147 and use the option --api=containers
148
149 Admins: configure the cluster to disable the 'jobs' API as described at:
150 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
151 *******************************""")
152
153         self.loadingContext = ArvLoadingContext(vars(arvargs))
154         self.loadingContext.fetcher_constructor = self.fetcher_constructor
155         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
156         self.loadingContext.construct_tool_object = self.arv_make_tool
157
158
159     def arv_make_tool(self, toolpath_object, loadingContext):
160         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
161             return ArvadosCommandTool(self, toolpath_object, loadingContext)
162         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
163             return ArvadosWorkflow(self, toolpath_object, loadingContext)
164         else:
165             return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
166
167     def output_callback(self, out, processStatus):
168         with self.workflow_eval_lock:
169             if processStatus == "success":
170                 logger.info("Overall process status is %s", processStatus)
171                 if self.pipeline:
172                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
173                                                          body={"state": "Complete"}).execute(num_retries=self.num_retries)
174             else:
175                 logger.error("Overall process status is %s", processStatus)
176                 if self.pipeline:
177                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
178                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
179             self.final_status = processStatus
180             self.final_output = out
181             self.workflow_eval_lock.notifyAll()
182
183
184     def start_run(self, runnable, runtimeContext):
185         self.task_queue.add(partial(runnable.run, runtimeContext))
186
187     def process_submitted(self, container):
188         with self.workflow_eval_lock:
189             self.processes[container.uuid] = container
190
191     def process_done(self, uuid, record):
192         with self.workflow_eval_lock:
193             j = self.processes[uuid]
194             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
195             self.task_queue.add(partial(j.done, record))
196             del self.processes[uuid]
197
198     def wrapped_callback(self, cb, obj, st):
199         with self.workflow_eval_lock:
200             cb(obj, st)
201             self.workflow_eval_lock.notifyAll()
202
203     def get_wrapped_callback(self, cb):
204         return partial(self.wrapped_callback, cb)
205
206     def on_message(self, event):
207         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
208             uuid = event["object_uuid"]
209             if event["properties"]["new_attributes"]["state"] == "Running":
210                 with self.workflow_eval_lock:
211                     j = self.processes[uuid]
212                     if j.running is False:
213                         j.running = True
214                         j.update_pipeline_component(event["properties"]["new_attributes"])
215                         logger.info("%s %s is Running", self.label(j), uuid)
216             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
217                 self.process_done(uuid, event["properties"]["new_attributes"])
218
219     def label(self, obj):
220         return "[%s %s]" % (self.work_api[0:-1], obj.name)
221
222     def poll_states(self):
223         """Poll status of jobs or containers listed in the processes dict.
224
225         Runs in a separate thread.
226         """
227
228         try:
229             remain_wait = self.poll_interval
230             while True:
231                 if remain_wait > 0:
232                     self.stop_polling.wait(remain_wait)
233                 if self.stop_polling.is_set():
234                     break
235                 with self.workflow_eval_lock:
236                     keys = list(self.processes.keys())
237                 if not keys:
238                     remain_wait = self.poll_interval
239                     continue
240
241                 begin_poll = time.time()
242                 if self.work_api == "containers":
243                     table = self.poll_api.container_requests()
244                 elif self.work_api == "jobs":
245                     table = self.poll_api.jobs()
246
247                 try:
248                     proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
249                 except Exception as e:
250                     logger.warn("Error checking states on API server: %s", e)
251                     remain_wait = self.poll_interval
252                     continue
253
254                 for p in proc_states["items"]:
255                     self.on_message({
256                         "object_uuid": p["uuid"],
257                         "event_type": "update",
258                         "properties": {
259                             "new_attributes": p
260                         }
261                     })
262                 finish_poll = time.time()
263                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
264         except:
265             logger.exception("Fatal error in state polling thread.")
266             with self.workflow_eval_lock:
267                 self.processes.clear()
268                 self.workflow_eval_lock.notifyAll()
269         finally:
270             self.stop_polling.set()
271
272     def add_intermediate_output(self, uuid):
273         if uuid:
274             self.intermediate_output_collections.append(uuid)
275
276     def trash_intermediate_output(self):
277         logger.info("Cleaning up intermediate output collections")
278         for i in self.intermediate_output_collections:
279             try:
280                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
281             except:
282                 logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
283             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
284                 break
285
286     def check_features(self, obj):
287         if isinstance(obj, dict):
288             if obj.get("writable") and self.work_api != "containers":
289                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
290             if obj.get("class") == "DockerRequirement":
291                 if obj.get("dockerOutputDirectory"):
292                     if self.work_api != "containers":
293                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
294                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
295                     if not obj.get("dockerOutputDirectory").startswith('/'):
296                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
297                             "Option 'dockerOutputDirectory' must be an absolute path.")
298             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
299                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
300             for v in obj.itervalues():
301                 self.check_features(v)
302         elif isinstance(obj, list):
303             for i,v in enumerate(obj):
304                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
305                     self.check_features(v)
306
307     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
308         outputObj = copy.deepcopy(outputObj)
309
310         files = []
311         def capture(fileobj):
312             files.append(fileobj)
313
314         adjustDirObjs(outputObj, capture)
315         adjustFileObjs(outputObj, capture)
316
317         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
318
319         final = arvados.collection.Collection(api_client=self.api,
320                                               keep_client=self.keep_client,
321                                               num_retries=self.num_retries)
322
323         for k,v in generatemapper.items():
324             if k.startswith("_:"):
325                 if v.type == "Directory":
326                     continue
327                 if v.type == "CreateFile":
328                     with final.open(v.target, "wb") as f:
329                         f.write(v.resolved.encode("utf-8"))
330                     continue
331
332             if not k.startswith("keep:"):
333                 raise Exception("Output source is not in keep or a literal")
334             sp = k.split("/")
335             srccollection = sp[0][5:]
336             try:
337                 reader = self.collection_cache.get(srccollection)
338                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
339                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
340             except arvados.errors.ArgumentError as e:
341                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
342                 raise
343             except IOError as e:
344                 logger.warn("While preparing output collection: %s", e)
345
346         def rewrite(fileobj):
347             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
348             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
349                 if k in fileobj:
350                     del fileobj[k]
351
352         adjustDirObjs(outputObj, rewrite)
353         adjustFileObjs(outputObj, rewrite)
354
355         with final.open("cwl.output.json", "w") as f:
356             json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
357
358         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
359
360         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
361                     final.api_response()["name"],
362                     final.manifest_locator())
363
364         final_uuid = final.manifest_locator()
365         tags = tagsString.split(',')
366         for tag in tags:
367              self.api.links().create(body={
368                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
369                 }).execute(num_retries=self.num_retries)
370
371         def finalcollection(fileobj):
372             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
373
374         adjustDirObjs(outputObj, finalcollection)
375         adjustFileObjs(outputObj, finalcollection)
376
377         return (outputObj, final)
378
379     def set_crunch_output(self):
380         if self.work_api == "containers":
381             try:
382                 current = self.api.containers().current().execute(num_retries=self.num_retries)
383             except ApiError as e:
384                 # Status code 404 just means we're not running in a container.
385                 if e.resp.status != 404:
386                     logger.info("Getting current container: %s", e)
387                 return
388             try:
389                 self.api.containers().update(uuid=current['uuid'],
390                                              body={
391                                                  'output': self.final_output_collection.portable_data_hash(),
392                                              }).execute(num_retries=self.num_retries)
393                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
394                                               body={
395                                                   'is_trashed': True
396                                               }).execute(num_retries=self.num_retries)
397             except Exception as e:
398                 logger.info("Setting container output: %s", e)
399         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
400             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
401                                    body={
402                                        'output': self.final_output_collection.portable_data_hash(),
403                                        'success': self.final_status == "success",
404                                        'progress':1.0
405                                    }).execute(num_retries=self.num_retries)
406
407     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
408         self.debug = runtimeContext.debug
409
410         tool.visit(self.check_features)
411
412         self.project_uuid = runtimeContext.project_uuid
413         self.pipeline = None
414         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
415         self.secret_store = runtimeContext.secret_store
416
417         self.trash_intermediate = runtimeContext.trash_intermediate
418         if self.trash_intermediate and self.work_api != "containers":
419             raise Exception("--trash-intermediate is only supported with --api=containers.")
420
421         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
422         if self.intermediate_output_ttl and self.work_api != "containers":
423             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
424         if self.intermediate_output_ttl < 0:
425             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
426
427         if runtimeContext.submit_request_uuid and self.work_api != "containers":
428             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
429
430         if not runtimeContext.name:
431             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
432
433         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
434         # Also uploads docker images.
435         merged_map = upload_workflow_deps(self, tool)
436
437         # Reload tool object which may have been updated by
438         # upload_workflow_deps
439         # Don't validate this time because it will just print redundant errors.
440         loadingContext = self.loadingContext.copy()
441         loadingContext.loader = tool.doc_loader
442         loadingContext.avsc_names = tool.doc_schema
443         loadingContext.metadata = tool.metadata
444         loadingContext.do_validate = False
445
446         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
447                                   loadingContext)
448
449         # Upload local file references in the job order.
450         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
451                                      tool, job_order)
452
453         existing_uuid = runtimeContext.update_workflow
454         if existing_uuid or runtimeContext.create_workflow:
455             # Create a pipeline template or workflow record and exit.
456             if self.work_api == "jobs":
457                 tmpl = RunnerTemplate(self, tool, job_order,
458                                       runtimeContext.enable_reuse,
459                                       uuid=existing_uuid,
460                                       submit_runner_ram=runtimeContext.submit_runner_ram,
461                                       name=runtimeContext.name,
462                                       merged_map=merged_map)
463                 tmpl.save()
464                 # cwltool.main will write our return value to stdout.
465                 return (tmpl.uuid, "success")
466             elif self.work_api == "containers":
467                 return (upload_workflow(self, tool, job_order,
468                                         self.project_uuid,
469                                         uuid=existing_uuid,
470                                         submit_runner_ram=runtimeContext.submit_runner_ram,
471                                         name=runtimeContext.name,
472                                         merged_map=merged_map),
473                         "success")
474
475         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
476         self.eval_timeout = runtimeContext.eval_timeout
477
478         runtimeContext = runtimeContext.copy()
479         runtimeContext.use_container = True
480         runtimeContext.tmpdir_prefix = "tmp"
481         runtimeContext.work_api = self.work_api
482
483         if self.work_api == "containers":
484             if self.ignore_docker_for_reuse:
485                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
486             runtimeContext.outdir = "/var/spool/cwl"
487             runtimeContext.docker_outdir = "/var/spool/cwl"
488             runtimeContext.tmpdir = "/tmp"
489             runtimeContext.docker_tmpdir = "/tmp"
490         elif self.work_api == "jobs":
491             if runtimeContext.priority != DEFAULT_PRIORITY:
492                 raise Exception("--priority not implemented for jobs API.")
493             runtimeContext.outdir = "$(task.outdir)"
494             runtimeContext.docker_outdir = "$(task.outdir)"
495             runtimeContext.tmpdir = "$(task.tmpdir)"
496
497         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
498             raise Exception("--priority must be in the range 1..1000.")
499
500         runnerjob = None
501         if runtimeContext.submit:
502             # Submit a runner job to run the workflow for us.
503             if self.work_api == "containers":
504                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait:
505                     runtimeContext.runnerjob = tool.tool["id"]
506                     runnerjob = tool.job(job_order,
507                                          self.output_callback,
508                                          runtimeContext).next()
509                 else:
510                     runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
511                                                 self.output_name,
512                                                 self.output_tags,
513                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
514                                                 name=runtimeContext.name,
515                                                 on_error=runtimeContext.on_error,
516                                                 submit_runner_image=runtimeContext.submit_runner_image,
517                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
518                                                 merged_map=merged_map,
519                                                 priority=runtimeContext.priority,
520                                                 secret_store=self.secret_store)
521             elif self.work_api == "jobs":
522                 runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
523                                       self.output_name,
524                                       self.output_tags,
525                                       submit_runner_ram=runtimeContext.submit_runner_ram,
526                                       name=runtimeContext.name,
527                                       on_error=runtimeContext.on_error,
528                                       submit_runner_image=runtimeContext.submit_runner_image,
529                                       merged_map=merged_map)
530         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
531             # Create pipeline for local run
532             self.pipeline = self.api.pipeline_instances().create(
533                 body={
534                     "owner_uuid": self.project_uuid,
535                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
536                     "components": {},
537                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
538             logger.info("Pipeline instance %s", self.pipeline["uuid"])
539
540         if runnerjob and not runtimeContext.wait:
541             submitargs = runtimeContext.copy()
542             submitargs.submit = False
543             runnerjob.run(submitargs)
544             return (runnerjob.uuid, "success")
545
546         self.poll_api = arvados.api('v1', timeout=kwargs["http_timeout"])
547         self.polling_thread = threading.Thread(target=self.poll_states)
548         self.polling_thread.start()
549
550         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
551
552         if runnerjob:
553             jobiter = iter((runnerjob,))
554         else:
555             if runtimeContext.cwl_runner_job is not None:
556                 self.uuid = runtimeContext.cwl_runner_job.get('uuid')
557             jobiter = tool.job(job_order,
558                                self.output_callback,
559                                runtimeContext)
560
561         try:
562             self.workflow_eval_lock.acquire()
563             # Holds the lock while this code runs and releases it when
564             # it is safe to do so in self.workflow_eval_lock.wait(),
565             # at which point on_message can update job state and
566             # process output callbacks.
567
568             loopperf = Perf(metrics, "jobiter")
569             loopperf.__enter__()
570             for runnable in jobiter:
571                 loopperf.__exit__()
572
573                 if self.stop_polling.is_set():
574                     break
575
576                 if self.task_queue.error is not None:
577                     raise self.task_queue.error
578
579                 if runnable:
580                     with Perf(metrics, "run"):
581                         self.start_run(runnable, runtimeContext)
582                 else:
583                     if (self.task_queue.in_flight + len(self.processes)) > 0:
584                         self.workflow_eval_lock.wait(3)
585                     else:
586                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
587                         break
588                 loopperf.__enter__()
589             loopperf.__exit__()
590
591             while (self.task_queue.in_flight + len(self.processes)) > 0:
592                 if self.task_queue.error is not None:
593                     raise self.task_queue.error
594                 self.workflow_eval_lock.wait(3)
595
596         except UnsupportedRequirement:
597             raise
598         except:
599             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
600                 logger.error("Interrupted, workflow will be cancelled")
601             else:
602                 logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
603             if self.pipeline:
604                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
605                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
606             if runnerjob and runnerjob.uuid and self.work_api == "containers":
607                 self.api.container_requests().update(uuid=runnerjob.uuid,
608                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
609         finally:
610             self.workflow_eval_lock.release()
611             self.task_queue.drain()
612             self.stop_polling.set()
613             self.polling_thread.join()
614             self.task_queue.join()
615
616         if self.final_status == "UnsupportedRequirement":
617             raise UnsupportedRequirement("Check log for details.")
618
619         if self.final_output is None:
620             raise WorkflowException("Workflow did not return a result.")
621
622         if runtimeContext.submit and isinstance(runnerjob, Runner):
623             logger.info("Final output collection %s", runnerjob.final_output)
624         else:
625             if self.output_name is None:
626                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
627             if self.output_tags is None:
628                 self.output_tags = ""
629
630             storage_classes = runtimeContext.storage_classes.strip().split(",")
631             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
632             self.set_crunch_output()
633
634         if runtimeContext.compute_checksum:
635             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
636             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
637
638         if self.trash_intermediate and self.final_status == "success":
639             self.trash_intermediate_output()
640
641         return (self.final_output, self.final_status)
642
643
644 def versionstring():
645     """Print version string of key packages for provenance and debugging."""
646
647     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
648     arvpkg = pkg_resources.require("arvados-python-client")
649     cwlpkg = pkg_resources.require("cwltool")
650
651     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
652                                     "arvados-python-client", arvpkg[0].version,
653                                     "cwltool", cwlpkg[0].version)
654
655
656 def arg_parser():  # type: () -> argparse.ArgumentParser
657     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
658
659     parser.add_argument("--basedir", type=str,
660                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
661     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
662                         help="Output directory, default current directory")
663
664     parser.add_argument("--eval-timeout",
665                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
666                         type=float,
667                         default=20)
668
669     exgroup = parser.add_mutually_exclusive_group()
670     exgroup.add_argument("--print-dot", action="store_true",
671                          help="Print workflow visualization in graphviz format and exit")
672     exgroup.add_argument("--version", action="version", help="Print version and exit", version=versionstring())
673     exgroup.add_argument("--validate", action="store_true", help="Validate CWL document only.")
674
675     exgroup = parser.add_mutually_exclusive_group()
676     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
677     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
678     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
679
680     parser.add_argument("--metrics", action="store_true", help="Print timing metrics")
681
682     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
683
684     exgroup = parser.add_mutually_exclusive_group()
685     exgroup.add_argument("--enable-reuse", action="store_true",
686                         default=True, dest="enable_reuse",
687                         help="Enable job or container reuse (default)")
688     exgroup.add_argument("--disable-reuse", action="store_false",
689                         default=True, dest="enable_reuse",
690                         help="Disable job or container reuse")
691
692     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
693     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
694     parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
695     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
696                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
697                         default=False)
698
699     exgroup = parser.add_mutually_exclusive_group()
700     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
701                         default=True, dest="submit")
702     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
703                         default=True, dest="submit")
704     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
705                          dest="create_workflow")
706     exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
707     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
708
709     exgroup = parser.add_mutually_exclusive_group()
710     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
711                         default=True, dest="wait")
712     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
713                         default=True, dest="wait")
714
715     exgroup = parser.add_mutually_exclusive_group()
716     exgroup.add_argument("--log-timestamps", action="store_true", help="Prefix logging lines with timestamp",
717                         default=True, dest="log_timestamps")
718     exgroup.add_argument("--no-log-timestamps", action="store_false", help="No timestamp on logging lines",
719                         default=True, dest="log_timestamps")
720
721     parser.add_argument("--api", type=str,
722                         default=None, dest="work_api",
723                         choices=("jobs", "containers"),
724                         help="Select work submission API.  Default is 'jobs' if that API is available, otherwise 'containers'.")
725
726     parser.add_argument("--compute-checksum", action="store_true", default=False,
727                         help="Compute checksum of contents while collecting outputs",
728                         dest="compute_checksum")
729
730     parser.add_argument("--submit-runner-ram", type=int,
731                         help="RAM (in MiB) required for the workflow runner job (default 1024)",
732                         default=None)
733
734     parser.add_argument("--submit-runner-image", type=str,
735                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
736                         default=None)
737
738     parser.add_argument("--submit-request-uuid", type=str,
739                         default=None,
740                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
741
742     parser.add_argument("--name", type=str,
743                         help="Name to use for workflow execution instance.",
744                         default=None)
745
746     parser.add_argument("--on-error", type=str,
747                         help="Desired workflow behavior when a step fails.  One of 'stop' or 'continue'. "
748                         "Default is 'continue'.", default="continue", choices=("stop", "continue"))
749
750     parser.add_argument("--enable-dev", action="store_true",
751                         help="Enable loading and running development versions "
752                              "of CWL spec.", default=False)
753     parser.add_argument('--storage-classes', default="default", type=str,
754                         help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
755
756     parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
757                         help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
758                         default=0)
759
760     parser.add_argument("--priority", type=int,
761                         help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
762                         default=DEFAULT_PRIORITY)
763
764     parser.add_argument("--disable-validate", dest="do_validate",
765                         action="store_false", default=True,
766                         help=argparse.SUPPRESS)
767
768     parser.add_argument("--disable-js-validation",
769                         action="store_true", default=False,
770                         help=argparse.SUPPRESS)
771
772     parser.add_argument("--thread-count", type=int,
773                         default=4, help="Number of threads to use for job submit and output collection.")
774
775     parser.add_argument("--http-timeout", type=int,
776                         default=5*60, dest="http_timeout", help="Http timeout. Default is 5 minutes.")
777
778     exgroup = parser.add_mutually_exclusive_group()
779     exgroup.add_argument("--trash-intermediate", action="store_true",
780                         default=False, dest="trash_intermediate",
781                          help="Immediately trash intermediate outputs on workflow success.")
782     exgroup.add_argument("--no-trash-intermediate", action="store_false",
783                         default=False, dest="trash_intermediate",
784                         help="Do not trash intermediate outputs (default).")
785
786     parser.add_argument("workflow", type=str, default=None, help="The workflow to execute")
787     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
788
789     return parser
790
791 def add_arv_hints():
792     cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
793     cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
794     res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
795     use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
796     res.close()
797     cwltool.process.supportedProcessRequirements.extend([
798         "http://arvados.org/cwl#RunInSingleContainer",
799         "http://arvados.org/cwl#OutputDirType",
800         "http://arvados.org/cwl#RuntimeConstraints",
801         "http://arvados.org/cwl#PartitionRequirement",
802         "http://arvados.org/cwl#APIRequirement",
803         "http://commonwl.org/cwltool#LoadListingRequirement",
804         "http://arvados.org/cwl#IntermediateOutput",
805         "http://arvados.org/cwl#ReuseRequirement"
806     ])
807
808 def exit_signal_handler(sigcode, frame):
809     logger.error("Caught signal {}, exiting.".format(sigcode))
810     sys.exit(-sigcode)
811
812 def main(args, stdout, stderr, api_client=None, keep_client=None,
813          install_sig_handlers=True):
814     parser = arg_parser()
815
816     job_order_object = None
817     arvargs = parser.parse_args(args)
818
819     if len(arvargs.storage_classes.strip().split(',')) > 1:
820         logger.error("Multiple storage classes are not supported currently.")
821         return 1
822
823     arvargs.use_container = True
824     arvargs.relax_path_checks = True
825     arvargs.print_supported_versions = False
826
827     if install_sig_handlers:
828         arv_cmd.install_signal_handlers()
829
830     if arvargs.update_workflow:
831         if arvargs.update_workflow.find('-7fd4e-') == 5:
832             want_api = 'containers'
833         elif arvargs.update_workflow.find('-p5p6p-') == 5:
834             want_api = 'jobs'
835         else:
836             want_api = None
837         if want_api and arvargs.work_api and want_api != arvargs.work_api:
838             logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
839                 arvargs.update_workflow, want_api, arvargs.work_api))
840             return 1
841         arvargs.work_api = want_api
842
843     if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
844         job_order_object = ({}, "")
845
846     add_arv_hints()
847
848     try:
849         if api_client is None:
850             api_client = arvados.safeapi.ThreadSafeApiCache(
851                 api_params={"model": OrderedJsonModel(), "timeout": arvargs.http_timeout},
852                 keep_params={"num_retries": 4})
853             keep_client = api_client.keep
854             # Make an API object now so errors are reported early.
855             api_client.users().current().execute()
856         if keep_client is None:
857             keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
858         runner = ArvCwlRunner(api_client, arvargs, keep_client=keep_client, num_retries=4)
859     except Exception as e:
860         logger.error(e)
861         return 1
862
863     if arvargs.debug:
864         logger.setLevel(logging.DEBUG)
865         logging.getLogger('arvados').setLevel(logging.DEBUG)
866
867     if arvargs.quiet:
868         logger.setLevel(logging.WARN)
869         logging.getLogger('arvados').setLevel(logging.WARN)
870         logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
871
872     if arvargs.metrics:
873         metrics.setLevel(logging.DEBUG)
874         logging.getLogger("cwltool.metrics").setLevel(logging.DEBUG)
875
876     if arvargs.log_timestamps:
877         arvados.log_handler.setFormatter(logging.Formatter(
878             '%(asctime)s %(name)s %(levelname)s: %(message)s',
879             '%Y-%m-%d %H:%M:%S'))
880     else:
881         arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
882
883     for key, val in cwltool.argparser.get_default_args().items():
884         if not hasattr(arvargs, key):
885             setattr(arvargs, key, val)
886
887     runtimeContext = ArvRuntimeContext(vars(arvargs))
888     runtimeContext.make_fs_access = partial(CollectionFsAccess,
889                              collection_cache=runner.collection_cache)
890
891     return cwltool.main.main(args=arvargs,
892                              stdout=stdout,
893                              stderr=stderr,
894                              executor=runner.arv_executor,
895                              versionfunc=versionstring,
896                              job_order_object=job_order_object,
897                              logger_handler=arvados.log_handler,
898                              custom_schema_callback=add_arv_hints,
899                              loadingContext=runner.loadingContext,
900                              runtimeContext=runtimeContext)