Merge branch '15026-cloudtest'
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .arvjob import RunnerJob, RunnerTemplate
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from .task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
47 from cwltool.command_line_tool import compute_checksums
48 from cwltool.load_tool import load_tool
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 DEFAULT_PRIORITY = 500
54
55 class RuntimeStatusLoggingHandler(logging.Handler):
56     """
57     Intercepts logging calls and report them as runtime statuses on runner
58     containers.
59     """
60     def __init__(self, runtime_status_update_func):
61         super(RuntimeStatusLoggingHandler, self).__init__()
62         self.runtime_status_update = runtime_status_update_func
63         self.updatingRuntimeStatus = False
64
65     def emit(self, record):
66         kind = None
67         if record.levelno >= logging.ERROR:
68             kind = 'error'
69         elif record.levelno >= logging.WARNING:
70             kind = 'warning'
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using either jobs or
95     containers API), wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4):
104
105         if arvargs is None:
106             arvargs = argparse.Namespace()
107             arvargs.work_api = None
108             arvargs.output_name = None
109             arvargs.output_tags = None
110             arvargs.thread_count = 1
111             arvargs.collection_cache_size = None
112
113         self.api = api_client
114         self.processes = {}
115         self.workflow_eval_lock = threading.Condition(threading.RLock())
116         self.final_output = None
117         self.final_status = None
118         self.num_retries = num_retries
119         self.uuid = None
120         self.stop_polling = threading.Event()
121         self.poll_api = None
122         self.pipeline = None
123         self.final_output_collection = None
124         self.output_name = arvargs.output_name
125         self.output_tags = arvargs.output_tags
126         self.project_uuid = None
127         self.intermediate_output_ttl = 0
128         self.intermediate_output_collections = []
129         self.trash_intermediate = False
130         self.thread_count = arvargs.thread_count
131         self.poll_interval = 12
132         self.loadingContext = None
133         self.should_estimate_cache_size = True
134         self.fs_access = None
135         self.secret_store = None
136
137         if keep_client is not None:
138             self.keep_client = keep_client
139         else:
140             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
141
142         if arvargs.collection_cache_size:
143             collection_cache_size = arvargs.collection_cache_size*1024*1024
144             self.should_estimate_cache_size = False
145         else:
146             collection_cache_size = 256*1024*1024
147
148         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
149                                                 cap=collection_cache_size)
150
151         self.fetcher_constructor = partial(CollectionFetcher,
152                                            api_client=self.api,
153                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
154                                            num_retries=self.num_retries)
155
156         self.work_api = None
157         expected_api = ["containers", "jobs"]
158         for api in expected_api:
159             try:
160                 methods = self.api._rootDesc.get('resources')[api]['methods']
161                 if ('httpMethod' in methods['create'] and
162                     (arvargs.work_api == api or arvargs.work_api is None)):
163                     self.work_api = api
164                     break
165             except KeyError:
166                 pass
167
168         if not self.work_api:
169             if arvargs.work_api is None:
170                 raise Exception("No supported APIs")
171             else:
172                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
173
174         if self.work_api == "jobs":
175             logger.warning("""
176 *******************************
177 Using the deprecated 'jobs' API.
178
179 To get rid of this warning:
180
181 Users: read about migrating at
182 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
183 and use the option --api=containers
184
185 Admins: configure the cluster to disable the 'jobs' API as described at:
186 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
187 *******************************""")
188
189         self.loadingContext = ArvLoadingContext(vars(arvargs))
190         self.loadingContext.fetcher_constructor = self.fetcher_constructor
191         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
192         self.loadingContext.construct_tool_object = self.arv_make_tool
193
194         # Add a custom logging handler to the root logger for runtime status reporting
195         # if running inside a container
196         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
197             root_logger = logging.getLogger('')
198
199             # Remove existing RuntimeStatusLoggingHandlers if they exist
200             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
201             root_logger.handlers = handlers
202
203             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
204             root_logger.addHandler(handler)
205
206         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
207         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
208                                                      collection_cache=self.collection_cache)
209
210         validate_cluster_target(self, self.runtimeContext)
211
212
213     def arv_make_tool(self, toolpath_object, loadingContext):
214         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
215             return ArvadosCommandTool(self, toolpath_object, loadingContext)
216         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
217             return ArvadosWorkflow(self, toolpath_object, loadingContext)
218         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
219             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
220         else:
221             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
222
223     def output_callback(self, out, processStatus):
224         with self.workflow_eval_lock:
225             if processStatus == "success":
226                 logger.info("Overall process status is %s", processStatus)
227                 state = "Complete"
228             else:
229                 logger.error("Overall process status is %s", processStatus)
230                 state = "Failed"
231             if self.pipeline:
232                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
233                                                         body={"state": state}).execute(num_retries=self.num_retries)
234             self.final_status = processStatus
235             self.final_output = out
236             self.workflow_eval_lock.notifyAll()
237
238
239     def start_run(self, runnable, runtimeContext):
240         self.task_queue.add(partial(runnable.run, runtimeContext),
241                             self.workflow_eval_lock, self.stop_polling)
242
243     def process_submitted(self, container):
244         with self.workflow_eval_lock:
245             self.processes[container.uuid] = container
246
247     def process_done(self, uuid, record):
248         with self.workflow_eval_lock:
249             j = self.processes[uuid]
250             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
251             self.task_queue.add(partial(j.done, record),
252                                 self.workflow_eval_lock, self.stop_polling)
253             del self.processes[uuid]
254
255     def runtime_status_update(self, kind, message, detail=None):
256         """
257         Updates the runtime_status field on the runner container.
258         Called when there's a need to report errors, warnings or just
259         activity statuses, for example in the RuntimeStatusLoggingHandler.
260         """
261         with self.workflow_eval_lock:
262             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
263             if current is None:
264                 return
265             runtime_status = current.get('runtime_status', {})
266             # In case of status being an error, only report the first one.
267             if kind == 'error':
268                 if not runtime_status.get('error'):
269                     runtime_status.update({
270                         'error': message
271                     })
272                     if detail is not None:
273                         runtime_status.update({
274                             'errorDetail': detail
275                         })
276                 # Further errors are only mentioned as a count.
277                 else:
278                     # Get anything before an optional 'and N more' string.
279                     try:
280                         error_msg = re.match(
281                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
282                         more_failures = re.match(
283                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
284                     except TypeError:
285                         # Ignore tests stubbing errors
286                         return
287                     if more_failures:
288                         failure_qty = int(more_failures.groups()[0])
289                         runtime_status.update({
290                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
291                         })
292                     else:
293                         runtime_status.update({
294                             'error': "%s (and 1 more)" % error_msg
295                         })
296             elif kind in ['warning', 'activity']:
297                 # Record the last warning/activity status without regard of
298                 # previous occurences.
299                 runtime_status.update({
300                     kind: message
301                 })
302                 if detail is not None:
303                     runtime_status.update({
304                         kind+"Detail": detail
305                     })
306             else:
307                 # Ignore any other status kind
308                 return
309             try:
310                 self.api.containers().update(uuid=current['uuid'],
311                                             body={
312                                                 'runtime_status': runtime_status,
313                                             }).execute(num_retries=self.num_retries)
314             except Exception as e:
315                 logger.info("Couldn't update runtime_status: %s", e)
316
317     def wrapped_callback(self, cb, obj, st):
318         with self.workflow_eval_lock:
319             cb(obj, st)
320             self.workflow_eval_lock.notifyAll()
321
322     def get_wrapped_callback(self, cb):
323         return partial(self.wrapped_callback, cb)
324
325     def on_message(self, event):
326         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
327             uuid = event["object_uuid"]
328             if event["properties"]["new_attributes"]["state"] == "Running":
329                 with self.workflow_eval_lock:
330                     j = self.processes[uuid]
331                     if j.running is False:
332                         j.running = True
333                         j.update_pipeline_component(event["properties"]["new_attributes"])
334                         logger.info("%s %s is Running", self.label(j), uuid)
335             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
336                 self.process_done(uuid, event["properties"]["new_attributes"])
337
338     def label(self, obj):
339         return "[%s %s]" % (self.work_api[0:-1], obj.name)
340
341     def poll_states(self):
342         """Poll status of jobs or containers listed in the processes dict.
343
344         Runs in a separate thread.
345         """
346
347         try:
348             remain_wait = self.poll_interval
349             while True:
350                 if remain_wait > 0:
351                     self.stop_polling.wait(remain_wait)
352                 if self.stop_polling.is_set():
353                     break
354                 with self.workflow_eval_lock:
355                     keys = list(self.processes)
356                 if not keys:
357                     remain_wait = self.poll_interval
358                     continue
359
360                 begin_poll = time.time()
361                 if self.work_api == "containers":
362                     table = self.poll_api.container_requests()
363                 elif self.work_api == "jobs":
364                     table = self.poll_api.jobs()
365
366                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
367
368                 while keys:
369                     page = keys[:pageSize]
370                     try:
371                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
372                     except Exception:
373                         logger.exception("Error checking states on API server: %s")
374                         remain_wait = self.poll_interval
375                         continue
376
377                     for p in proc_states["items"]:
378                         self.on_message({
379                             "object_uuid": p["uuid"],
380                             "event_type": "update",
381                             "properties": {
382                                 "new_attributes": p
383                             }
384                         })
385                     keys = keys[pageSize:]
386
387                 finish_poll = time.time()
388                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
389         except:
390             logger.exception("Fatal error in state polling thread.")
391             with self.workflow_eval_lock:
392                 self.processes.clear()
393                 self.workflow_eval_lock.notifyAll()
394         finally:
395             self.stop_polling.set()
396
397     def add_intermediate_output(self, uuid):
398         if uuid:
399             self.intermediate_output_collections.append(uuid)
400
401     def trash_intermediate_output(self):
402         logger.info("Cleaning up intermediate output collections")
403         for i in self.intermediate_output_collections:
404             try:
405                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
406             except Exception:
407                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
408             except (KeyboardInterrupt, SystemExit):
409                 break
410
411     def check_features(self, obj, parentfield=""):
412         if isinstance(obj, dict):
413             if obj.get("writable") and self.work_api != "containers":
414                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
415             if obj.get("class") == "DockerRequirement":
416                 if obj.get("dockerOutputDirectory"):
417                     if self.work_api != "containers":
418                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
419                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
420                     if not obj.get("dockerOutputDirectory").startswith('/'):
421                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
422                             "Option 'dockerOutputDirectory' must be an absolute path.")
423             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
424                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
425             if obj.get("class") == "InplaceUpdateRequirement":
426                 if obj["inplaceUpdate"] and parentfield == "requirements":
427                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
428             for k,v in viewitems(obj):
429                 self.check_features(v, parentfield=k)
430         elif isinstance(obj, list):
431             for i,v in enumerate(obj):
432                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
433                     self.check_features(v, parentfield=parentfield)
434
435     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
436         outputObj = copy.deepcopy(outputObj)
437
438         files = []
439         def capture(fileobj):
440             files.append(fileobj)
441
442         adjustDirObjs(outputObj, capture)
443         adjustFileObjs(outputObj, capture)
444
445         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
446
447         final = arvados.collection.Collection(api_client=self.api,
448                                               keep_client=self.keep_client,
449                                               num_retries=self.num_retries)
450
451         for k,v in generatemapper.items():
452             if v.type == "Directory" and v.resolved.startswith("_:"):
453                     continue
454             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
455                 with final.open(v.target, "wb") as f:
456                     f.write(v.resolved.encode("utf-8"))
457                     continue
458
459             if not v.resolved.startswith("keep:"):
460                 raise Exception("Output source is not in keep or a literal")
461             sp = v.resolved.split("/")
462             srccollection = sp[0][5:]
463             try:
464                 reader = self.collection_cache.get(srccollection)
465                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
466                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
467             except arvados.errors.ArgumentError as e:
468                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
469                 raise
470             except IOError as e:
471                 logger.error("While preparing output collection: %s", e)
472                 raise
473
474         def rewrite(fileobj):
475             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
476             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
477                 if k in fileobj:
478                     del fileobj[k]
479
480         adjustDirObjs(outputObj, rewrite)
481         adjustFileObjs(outputObj, rewrite)
482
483         with final.open("cwl.output.json", "w") as f:
484             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
485             f.write(res)
486
487         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
488
489         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
490                     final.api_response()["name"],
491                     final.manifest_locator())
492
493         final_uuid = final.manifest_locator()
494         tags = tagsString.split(',')
495         for tag in tags:
496              self.api.links().create(body={
497                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
498                 }).execute(num_retries=self.num_retries)
499
500         def finalcollection(fileobj):
501             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
502
503         adjustDirObjs(outputObj, finalcollection)
504         adjustFileObjs(outputObj, finalcollection)
505
506         return (outputObj, final)
507
508     def set_crunch_output(self):
509         if self.work_api == "containers":
510             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
511             if current is None:
512                 return
513             try:
514                 self.api.containers().update(uuid=current['uuid'],
515                                              body={
516                                                  'output': self.final_output_collection.portable_data_hash(),
517                                              }).execute(num_retries=self.num_retries)
518                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
519                                               body={
520                                                   'is_trashed': True
521                                               }).execute(num_retries=self.num_retries)
522             except Exception:
523                 logger.exception("Setting container output")
524                 return
525         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
526             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
527                                    body={
528                                        'output': self.final_output_collection.portable_data_hash(),
529                                        'success': self.final_status == "success",
530                                        'progress':1.0
531                                    }).execute(num_retries=self.num_retries)
532
533     def apply_reqs(self, job_order_object, tool):
534         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
535             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
536                 raise WorkflowException(
537                     "`cwl:requirements` in the input object is not part of CWL "
538                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
539                     "can set the cwlVersion to v1.1 or greater and re-run with "
540                     "--enable-dev.")
541             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
542             for req in job_reqs:
543                 tool.requirements.append(req)
544
545     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
546         self.debug = runtimeContext.debug
547
548         tool.visit(self.check_features)
549
550         self.project_uuid = runtimeContext.project_uuid
551         self.pipeline = None
552         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
553         self.secret_store = runtimeContext.secret_store
554
555         self.trash_intermediate = runtimeContext.trash_intermediate
556         if self.trash_intermediate and self.work_api != "containers":
557             raise Exception("--trash-intermediate is only supported with --api=containers.")
558
559         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
560         if self.intermediate_output_ttl and self.work_api != "containers":
561             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
562         if self.intermediate_output_ttl < 0:
563             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
564
565         if runtimeContext.submit_request_uuid and self.work_api != "containers":
566             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
567
568         if not runtimeContext.name:
569             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
570
571         # Upload local file references in the job order.
572         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
573                                      tool, job_order)
574
575         submitting = (runtimeContext.update_workflow or
576                       runtimeContext.create_workflow or
577                       (runtimeContext.submit and not
578                        (tool.tool["class"] == "CommandLineTool" and
579                         runtimeContext.wait and
580                         not runtimeContext.always_submit_runner)))
581
582         loadingContext = self.loadingContext.copy()
583         loadingContext.do_validate = False
584         loadingContext.do_update = False
585         if submitting:
586             # Document may have been auto-updated. Reload the original
587             # document with updating disabled because we want to
588             # submit the original document, not the auto-updated one.
589             tool = load_tool(tool.tool["id"], loadingContext)
590
591         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
592         # Also uploads docker images.
593         merged_map = upload_workflow_deps(self, tool)
594
595         # Recreate process object (ArvadosWorkflow or
596         # ArvadosCommandTool) because tool document may have been
597         # updated by upload_workflow_deps in ways that modify
598         # inheritance of hints or requirements.
599         loadingContext.loader = tool.doc_loader
600         loadingContext.avsc_names = tool.doc_schema
601         loadingContext.metadata = tool.metadata
602         tool = load_tool(tool.tool, loadingContext)
603
604         existing_uuid = runtimeContext.update_workflow
605         if existing_uuid or runtimeContext.create_workflow:
606             # Create a pipeline template or workflow record and exit.
607             if self.work_api == "jobs":
608                 tmpl = RunnerTemplate(self, tool, job_order,
609                                       runtimeContext.enable_reuse,
610                                       uuid=existing_uuid,
611                                       submit_runner_ram=runtimeContext.submit_runner_ram,
612                                       name=runtimeContext.name,
613                                       merged_map=merged_map,
614                                       loadingContext=loadingContext)
615                 tmpl.save()
616                 # cwltool.main will write our return value to stdout.
617                 return (tmpl.uuid, "success")
618             elif self.work_api == "containers":
619                 return (upload_workflow(self, tool, job_order,
620                                         self.project_uuid,
621                                         uuid=existing_uuid,
622                                         submit_runner_ram=runtimeContext.submit_runner_ram,
623                                         name=runtimeContext.name,
624                                         merged_map=merged_map),
625                         "success")
626
627         self.apply_reqs(job_order, tool)
628
629         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
630         self.eval_timeout = runtimeContext.eval_timeout
631
632         runtimeContext = runtimeContext.copy()
633         runtimeContext.use_container = True
634         runtimeContext.tmpdir_prefix = "tmp"
635         runtimeContext.work_api = self.work_api
636
637         if self.work_api == "containers":
638             if self.ignore_docker_for_reuse:
639                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
640             runtimeContext.outdir = "/var/spool/cwl"
641             runtimeContext.docker_outdir = "/var/spool/cwl"
642             runtimeContext.tmpdir = "/tmp"
643             runtimeContext.docker_tmpdir = "/tmp"
644         elif self.work_api == "jobs":
645             if runtimeContext.priority != DEFAULT_PRIORITY:
646                 raise Exception("--priority not implemented for jobs API.")
647             runtimeContext.outdir = "$(task.outdir)"
648             runtimeContext.docker_outdir = "$(task.outdir)"
649             runtimeContext.tmpdir = "$(task.tmpdir)"
650
651         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
652             raise Exception("--priority must be in the range 1..1000.")
653
654         if self.should_estimate_cache_size:
655             visited = set()
656             estimated_size = [0]
657             def estimate_collection_cache(obj):
658                 if obj.get("location", "").startswith("keep:"):
659                     m = pdh_size.match(obj["location"][5:])
660                     if m and m.group(1) not in visited:
661                         visited.add(m.group(1))
662                         estimated_size[0] += int(m.group(2))
663             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
664             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
665             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
666
667         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
668
669         runnerjob = None
670         if runtimeContext.submit:
671             # Submit a runner job to run the workflow for us.
672             if self.work_api == "containers":
673                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
674                     runtimeContext.runnerjob = tool.tool["id"]
675                 else:
676                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
677                                                 self.output_name,
678                                                 self.output_tags,
679                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
680                                                 name=runtimeContext.name,
681                                                 on_error=runtimeContext.on_error,
682                                                 submit_runner_image=runtimeContext.submit_runner_image,
683                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
684                                                 merged_map=merged_map,
685                                                 priority=runtimeContext.priority,
686                                                 secret_store=self.secret_store,
687                                                 collection_cache_size=runtimeContext.collection_cache_size,
688                                                 collection_cache_is_default=self.should_estimate_cache_size)
689             elif self.work_api == "jobs":
690                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
691                                       self.output_name,
692                                       self.output_tags,
693                                       submit_runner_ram=runtimeContext.submit_runner_ram,
694                                       name=runtimeContext.name,
695                                       on_error=runtimeContext.on_error,
696                                       submit_runner_image=runtimeContext.submit_runner_image,
697                                       merged_map=merged_map)
698         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
699             # Create pipeline for local run
700             self.pipeline = self.api.pipeline_instances().create(
701                 body={
702                     "owner_uuid": self.project_uuid,
703                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
704                     "components": {},
705                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
706             logger.info("Pipeline instance %s", self.pipeline["uuid"])
707
708         if runtimeContext.cwl_runner_job is not None:
709             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
710
711         jobiter = tool.job(job_order,
712                            self.output_callback,
713                            runtimeContext)
714
715         if runtimeContext.submit and not runtimeContext.wait:
716             runnerjob = next(jobiter)
717             runnerjob.run(runtimeContext)
718             return (runnerjob.uuid, "success")
719
720         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
721         if current_container:
722             logger.info("Running inside container %s", current_container.get("uuid"))
723
724         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
725         self.polling_thread = threading.Thread(target=self.poll_states)
726         self.polling_thread.start()
727
728         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
729
730         try:
731             self.workflow_eval_lock.acquire()
732
733             # Holds the lock while this code runs and releases it when
734             # it is safe to do so in self.workflow_eval_lock.wait(),
735             # at which point on_message can update job state and
736             # process output callbacks.
737
738             loopperf = Perf(metrics, "jobiter")
739             loopperf.__enter__()
740             for runnable in jobiter:
741                 loopperf.__exit__()
742
743                 if self.stop_polling.is_set():
744                     break
745
746                 if self.task_queue.error is not None:
747                     raise self.task_queue.error
748
749                 if runnable:
750                     with Perf(metrics, "run"):
751                         self.start_run(runnable, runtimeContext)
752                 else:
753                     if (self.task_queue.in_flight + len(self.processes)) > 0:
754                         self.workflow_eval_lock.wait(3)
755                     else:
756                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
757                         break
758
759                 if self.stop_polling.is_set():
760                     break
761
762                 loopperf.__enter__()
763             loopperf.__exit__()
764
765             while (self.task_queue.in_flight + len(self.processes)) > 0:
766                 if self.task_queue.error is not None:
767                     raise self.task_queue.error
768                 self.workflow_eval_lock.wait(3)
769
770         except UnsupportedRequirement:
771             raise
772         except:
773             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
774                 logger.error("Interrupted, workflow will be cancelled")
775             elif isinstance(sys.exc_info()[1], WorkflowException):
776                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
777             else:
778                 logger.exception("Workflow execution failed")
779
780             if self.pipeline:
781                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
782                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
783
784             if self.work_api == "containers" and not current_container:
785                 # Not running in a crunch container, so cancel any outstanding processes.
786                 for p in self.processes:
787                     try:
788                         self.api.container_requests().update(uuid=p,
789                                                              body={"priority": "0"}
790                         ).execute(num_retries=self.num_retries)
791                     except Exception:
792                         pass
793         finally:
794             self.workflow_eval_lock.release()
795             self.task_queue.drain()
796             self.stop_polling.set()
797             self.polling_thread.join()
798             self.task_queue.join()
799
800         if self.final_status == "UnsupportedRequirement":
801             raise UnsupportedRequirement("Check log for details.")
802
803         if self.final_output is None:
804             raise WorkflowException("Workflow did not return a result.")
805
806         if runtimeContext.submit and isinstance(tool, Runner):
807             logger.info("Final output collection %s", tool.final_output)
808         else:
809             if self.output_name is None:
810                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
811             if self.output_tags is None:
812                 self.output_tags = ""
813
814             storage_classes = runtimeContext.storage_classes.strip().split(",")
815             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
816             self.set_crunch_output()
817
818         if runtimeContext.compute_checksum:
819             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
820             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
821
822         if self.trash_intermediate and self.final_status == "success":
823             self.trash_intermediate_output()
824
825         return (self.final_output, self.final_status)