Merge branch 'master' into 13306-arvados-cwl-runner-py3-support
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from future.utils import viewvalues
9
10 import argparse
11 import logging
12 import os
13 import sys
14 import threading
15 import copy
16 import json
17 import re
18 from functools import partial
19 import time
20
21 from cwltool.errors import WorkflowException
22 import cwltool.workflow
23 from schema_salad.sourceline import SourceLine
24 import schema_salad.validate as validate
25
26 import arvados
27 import arvados.config
28 from arvados.keep import KeepClient
29 from arvados.errors import ApiError
30
31 import arvados_cwl.util
32 from .arvcontainer import RunnerContainer
33 from .arvjob import RunnerJob, RunnerTemplate
34 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
35 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
36 from .arvworkflow import ArvadosWorkflow, upload_workflow
37 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
38 from .perf import Perf
39 from .pathmapper import NoFollowPathMapper
40 from .task_queue import TaskQueue
41 from .context import ArvLoadingContext, ArvRuntimeContext
42 from ._version import __version__
43
44 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
45 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
46 from cwltool.command_line_tool import compute_checksums
47
48 logger = logging.getLogger('arvados.cwl-runner')
49 metrics = logging.getLogger('arvados.cwl-runner.metrics')
50
51 DEFAULT_PRIORITY = 500
52
53 class RuntimeStatusLoggingHandler(logging.Handler):
54     """
55     Intercepts logging calls and report them as runtime statuses on runner
56     containers.
57     """
58     def __init__(self, runtime_status_update_func):
59         super(RuntimeStatusLoggingHandler, self).__init__()
60         self.runtime_status_update = runtime_status_update_func
61
62     def emit(self, record):
63         kind = None
64         if record.levelno >= logging.ERROR:
65             kind = 'error'
66         elif record.levelno >= logging.WARNING:
67             kind = 'warning'
68         if kind is not None:
69             log_msg = record.getMessage()
70             if '\n' in log_msg:
71                 # If the logged message is multi-line, use its first line as status
72                 # and the rest as detail.
73                 status, detail = log_msg.split('\n', 1)
74                 self.runtime_status_update(
75                     kind,
76                     "%s: %s" % (record.name, status),
77                     detail
78                 )
79             else:
80                 self.runtime_status_update(
81                     kind,
82                     "%s: %s" % (record.name, record.getMessage())
83                 )
84
85 class ArvCwlExecutor(object):
86     """Execute a CWL tool or workflow, submit work (using either jobs or
87     containers API), wait for them to complete, and report output.
88
89     """
90
91     def __init__(self, api_client,
92                  arvargs=None,
93                  keep_client=None,
94                  num_retries=4,
95                  thread_count=4):
96
97         if arvargs is None:
98             arvargs = argparse.Namespace()
99             arvargs.work_api = None
100             arvargs.output_name = None
101             arvargs.output_tags = None
102             arvargs.thread_count = 1
103             arvargs.collection_cache_size = None
104
105         self.api = api_client
106         self.processes = {}
107         self.workflow_eval_lock = threading.Condition(threading.RLock())
108         self.final_output = None
109         self.final_status = None
110         self.num_retries = num_retries
111         self.uuid = None
112         self.stop_polling = threading.Event()
113         self.poll_api = None
114         self.pipeline = None
115         self.final_output_collection = None
116         self.output_name = arvargs.output_name
117         self.output_tags = arvargs.output_tags
118         self.project_uuid = None
119         self.intermediate_output_ttl = 0
120         self.intermediate_output_collections = []
121         self.trash_intermediate = False
122         self.thread_count = arvargs.thread_count
123         self.poll_interval = 12
124         self.loadingContext = None
125         self.should_estimate_cache_size = True
126
127         if keep_client is not None:
128             self.keep_client = keep_client
129         else:
130             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
131
132         if arvargs.collection_cache_size:
133             collection_cache_size = arvargs.collection_cache_size*1024*1024
134             self.should_estimate_cache_size = False
135         else:
136             collection_cache_size = 256*1024*1024
137
138         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
139                                                 cap=collection_cache_size)
140
141         self.fetcher_constructor = partial(CollectionFetcher,
142                                            api_client=self.api,
143                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
144                                            num_retries=self.num_retries)
145
146         self.work_api = None
147         expected_api = ["jobs", "containers"]
148         for api in expected_api:
149             try:
150                 methods = self.api._rootDesc.get('resources')[api]['methods']
151                 if ('httpMethod' in methods['create'] and
152                     (arvargs.work_api == api or arvargs.work_api is None)):
153                     self.work_api = api
154                     break
155             except KeyError:
156                 pass
157
158         if not self.work_api:
159             if arvargs.work_api is None:
160                 raise Exception("No supported APIs")
161             else:
162                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
163
164         if self.work_api == "jobs":
165             logger.warning("""
166 *******************************
167 Using the deprecated 'jobs' API.
168
169 To get rid of this warning:
170
171 Users: read about migrating at
172 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
173 and use the option --api=containers
174
175 Admins: configure the cluster to disable the 'jobs' API as described at:
176 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
177 *******************************""")
178
179         self.loadingContext = ArvLoadingContext(vars(arvargs))
180         self.loadingContext.fetcher_constructor = self.fetcher_constructor
181         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
182         self.loadingContext.construct_tool_object = self.arv_make_tool
183
184         # Add a custom logging handler to the root logger for runtime status reporting
185         # if running inside a container
186         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
187             root_logger = logging.getLogger('')
188
189             # Remove existing RuntimeStatusLoggingHandlers if they exist
190             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
191             root_logger.handlers = handlers
192
193             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
194             root_logger.addHandler(handler)
195
196         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
197         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
198                                                      collection_cache=self.collection_cache)
199
200         validate_cluster_target(self, self.runtimeContext)
201
202
203     def arv_make_tool(self, toolpath_object, loadingContext):
204         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
205             return ArvadosCommandTool(self, toolpath_object, loadingContext)
206         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
207             return ArvadosWorkflow(self, toolpath_object, loadingContext)
208         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
209             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
210         else:
211             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
212
213     def output_callback(self, out, processStatus):
214         with self.workflow_eval_lock:
215             if processStatus == "success":
216                 logger.info("Overall process status is %s", processStatus)
217                 state = "Complete"
218             else:
219                 logger.error("Overall process status is %s", processStatus)
220                 state = "Failed"
221             if self.pipeline:
222                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
223                                                         body={"state": state}).execute(num_retries=self.num_retries)
224             self.final_status = processStatus
225             self.final_output = out
226             self.workflow_eval_lock.notifyAll()
227
228
229     def start_run(self, runnable, runtimeContext):
230         self.task_queue.add(partial(runnable.run, runtimeContext),
231                             self.workflow_eval_lock, self.stop_polling)
232
233     def process_submitted(self, container):
234         with self.workflow_eval_lock:
235             self.processes[container.uuid] = container
236
237     def process_done(self, uuid, record):
238         with self.workflow_eval_lock:
239             j = self.processes[uuid]
240             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
241             self.task_queue.add(partial(j.done, record),
242                                 self.workflow_eval_lock, self.stop_polling)
243             del self.processes[uuid]
244
245     def runtime_status_update(self, kind, message, detail=None):
246         """
247         Updates the runtime_status field on the runner container.
248         Called when there's a need to report errors, warnings or just
249         activity statuses, for example in the RuntimeStatusLoggingHandler.
250         """
251         with self.workflow_eval_lock:
252             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
253             if current is None:
254                 return
255             runtime_status = current.get('runtime_status', {})
256             # In case of status being an error, only report the first one.
257             if kind == 'error':
258                 if not runtime_status.get('error'):
259                     runtime_status.update({
260                         'error': message
261                     })
262                     if detail is not None:
263                         runtime_status.update({
264                             'errorDetail': detail
265                         })
266                 # Further errors are only mentioned as a count.
267                 else:
268                     # Get anything before an optional 'and N more' string.
269                     try:
270                         error_msg = re.match(
271                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
272                         more_failures = re.match(
273                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
274                     except TypeError:
275                         # Ignore tests stubbing errors
276                         return
277                     if more_failures:
278                         failure_qty = int(more_failures.groups()[0])
279                         runtime_status.update({
280                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
281                         })
282                     else:
283                         runtime_status.update({
284                             'error': "%s (and 1 more)" % error_msg
285                         })
286             elif kind in ['warning', 'activity']:
287                 # Record the last warning/activity status without regard of
288                 # previous occurences.
289                 runtime_status.update({
290                     kind: message
291                 })
292                 if detail is not None:
293                     runtime_status.update({
294                         kind+"Detail": detail
295                     })
296             else:
297                 # Ignore any other status kind
298                 return
299             try:
300                 self.api.containers().update(uuid=current['uuid'],
301                                             body={
302                                                 'runtime_status': runtime_status,
303                                             }).execute(num_retries=self.num_retries)
304             except Exception as e:
305                 logger.info("Couldn't update runtime_status: %s", e)
306
307     def wrapped_callback(self, cb, obj, st):
308         with self.workflow_eval_lock:
309             cb(obj, st)
310             self.workflow_eval_lock.notifyAll()
311
312     def get_wrapped_callback(self, cb):
313         return partial(self.wrapped_callback, cb)
314
315     def on_message(self, event):
316         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
317             uuid = event["object_uuid"]
318             if event["properties"]["new_attributes"]["state"] == "Running":
319                 with self.workflow_eval_lock:
320                     j = self.processes[uuid]
321                     if j.running is False:
322                         j.running = True
323                         j.update_pipeline_component(event["properties"]["new_attributes"])
324                         logger.info("%s %s is Running", self.label(j), uuid)
325             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
326                 self.process_done(uuid, event["properties"]["new_attributes"])
327
328     def label(self, obj):
329         return "[%s %s]" % (self.work_api[0:-1], obj.name)
330
331     def poll_states(self):
332         """Poll status of jobs or containers listed in the processes dict.
333
334         Runs in a separate thread.
335         """
336
337         try:
338             remain_wait = self.poll_interval
339             while True:
340                 if remain_wait > 0:
341                     self.stop_polling.wait(remain_wait)
342                 if self.stop_polling.is_set():
343                     break
344                 with self.workflow_eval_lock:
345                     keys = list(self.processes)
346                 if not keys:
347                     remain_wait = self.poll_interval
348                     continue
349
350                 begin_poll = time.time()
351                 if self.work_api == "containers":
352                     table = self.poll_api.container_requests()
353                 elif self.work_api == "jobs":
354                     table = self.poll_api.jobs()
355
356                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
357
358                 while keys:
359                     page = keys[:pageSize]
360                     keys = keys[pageSize:]
361                     try:
362                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
363                     except Exception as e:
364                         logger.warning("Error checking states on API server: %s", e)
365                         remain_wait = self.poll_interval
366                         continue
367
368                     for p in proc_states["items"]:
369                         self.on_message({
370                             "object_uuid": p["uuid"],
371                             "event_type": "update",
372                             "properties": {
373                                 "new_attributes": p
374                             }
375                         })
376                 finish_poll = time.time()
377                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
378         except:
379             logger.exception("Fatal error in state polling thread.")
380             with self.workflow_eval_lock:
381                 self.processes.clear()
382                 self.workflow_eval_lock.notifyAll()
383         finally:
384             self.stop_polling.set()
385
386     def add_intermediate_output(self, uuid):
387         if uuid:
388             self.intermediate_output_collections.append(uuid)
389
390     def trash_intermediate_output(self):
391         logger.info("Cleaning up intermediate output collections")
392         for i in self.intermediate_output_collections:
393             try:
394                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
395             except:
396                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
397             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
398                 break
399
400     def check_features(self, obj):
401         if isinstance(obj, dict):
402             if obj.get("writable") and self.work_api != "containers":
403                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
404             if obj.get("class") == "DockerRequirement":
405                 if obj.get("dockerOutputDirectory"):
406                     if self.work_api != "containers":
407                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
408                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
409                     if not obj.get("dockerOutputDirectory").startswith('/'):
410                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
411                             "Option 'dockerOutputDirectory' must be an absolute path.")
412             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
413                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
414             for v in viewvalues(obj):
415                 self.check_features(v)
416         elif isinstance(obj, list):
417             for i,v in enumerate(obj):
418                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
419                     self.check_features(v)
420
421     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
422         outputObj = copy.deepcopy(outputObj)
423
424         files = []
425         def capture(fileobj):
426             files.append(fileobj)
427
428         adjustDirObjs(outputObj, capture)
429         adjustFileObjs(outputObj, capture)
430
431         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
432
433         final = arvados.collection.Collection(api_client=self.api,
434                                               keep_client=self.keep_client,
435                                               num_retries=self.num_retries)
436
437         for k,v in generatemapper.items():
438             if k.startswith("_:"):
439                 if v.type == "Directory":
440                     continue
441                 if v.type == "CreateFile":
442                     with final.open(v.target, "wb") as f:
443                         f.write(v.resolved.encode("utf-8"))
444                     continue
445
446             if not k.startswith("keep:"):
447                 raise Exception("Output source is not in keep or a literal")
448             sp = k.split("/")
449             srccollection = sp[0][5:]
450             try:
451                 reader = self.collection_cache.get(srccollection)
452                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
453                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
454             except arvados.errors.ArgumentError as e:
455                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
456                 raise
457             except IOError as e:
458                 logger.warning("While preparing output collection: %s", e)
459
460         def rewrite(fileobj):
461             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
462             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
463                 if k in fileobj:
464                     del fileobj[k]
465
466         adjustDirObjs(outputObj, rewrite)
467         adjustFileObjs(outputObj, rewrite)
468
469         with final.open("cwl.output.json", "w") as f:
470             res = json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False).encode('utf-8').decode()
471             f.write(res)           
472
473         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
474
475         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
476                     final.api_response()["name"],
477                     final.manifest_locator())
478
479         final_uuid = final.manifest_locator()
480         tags = tagsString.split(',')
481         for tag in tags:
482              self.api.links().create(body={
483                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
484                 }).execute(num_retries=self.num_retries)
485
486         def finalcollection(fileobj):
487             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
488
489         adjustDirObjs(outputObj, finalcollection)
490         adjustFileObjs(outputObj, finalcollection)
491
492         return (outputObj, final)
493
494     def set_crunch_output(self):
495         if self.work_api == "containers":
496             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
497             if current is None:
498                 return
499             try:
500                 self.api.containers().update(uuid=current['uuid'],
501                                              body={
502                                                  'output': self.final_output_collection.portable_data_hash(),
503                                              }).execute(num_retries=self.num_retries)
504                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
505                                               body={
506                                                   'is_trashed': True
507                                               }).execute(num_retries=self.num_retries)
508             except Exception as e:
509                 logger.info("Setting container output: %s", e)
510         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
511             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
512                                    body={
513                                        'output': self.final_output_collection.portable_data_hash(),
514                                        'success': self.final_status == "success",
515                                        'progress':1.0
516                                    }).execute(num_retries=self.num_retries)
517
518     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
519         self.debug = runtimeContext.debug
520
521         tool.visit(self.check_features)
522
523         self.project_uuid = runtimeContext.project_uuid
524         self.pipeline = None
525         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
526         self.secret_store = runtimeContext.secret_store
527
528         self.trash_intermediate = runtimeContext.trash_intermediate
529         if self.trash_intermediate and self.work_api != "containers":
530             raise Exception("--trash-intermediate is only supported with --api=containers.")
531
532         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
533         if self.intermediate_output_ttl and self.work_api != "containers":
534             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
535         if self.intermediate_output_ttl < 0:
536             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
537
538         if runtimeContext.submit_request_uuid and self.work_api != "containers":
539             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
540
541         if not runtimeContext.name:
542             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
543
544         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
545         # Also uploads docker images.
546         merged_map = upload_workflow_deps(self, tool)
547
548         # Reload tool object which may have been updated by
549         # upload_workflow_deps
550         # Don't validate this time because it will just print redundant errors.
551         loadingContext = self.loadingContext.copy()
552         loadingContext.loader = tool.doc_loader
553         loadingContext.avsc_names = tool.doc_schema
554         loadingContext.metadata = tool.metadata
555         loadingContext.do_validate = False
556
557         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
558                                   loadingContext)
559
560         # Upload local file references in the job order.
561         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
562                                      tool, job_order)
563
564         existing_uuid = runtimeContext.update_workflow
565         if existing_uuid or runtimeContext.create_workflow:
566             # Create a pipeline template or workflow record and exit.
567             if self.work_api == "jobs":
568                 tmpl = RunnerTemplate(self, tool, job_order,
569                                       runtimeContext.enable_reuse,
570                                       uuid=existing_uuid,
571                                       submit_runner_ram=runtimeContext.submit_runner_ram,
572                                       name=runtimeContext.name,
573                                       merged_map=merged_map,
574                                       loadingContext=loadingContext)
575                 tmpl.save()
576                 # cwltool.main will write our return value to stdout.
577                 return (tmpl.uuid, "success")
578             elif self.work_api == "containers":
579                 return (upload_workflow(self, tool, job_order,
580                                         self.project_uuid,
581                                         uuid=existing_uuid,
582                                         submit_runner_ram=runtimeContext.submit_runner_ram,
583                                         name=runtimeContext.name,
584                                         merged_map=merged_map),
585                         "success")
586
587         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
588         self.eval_timeout = runtimeContext.eval_timeout
589
590         runtimeContext = runtimeContext.copy()
591         runtimeContext.use_container = True
592         runtimeContext.tmpdir_prefix = "tmp"
593         runtimeContext.work_api = self.work_api
594
595         if self.work_api == "containers":
596             if self.ignore_docker_for_reuse:
597                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
598             runtimeContext.outdir = "/var/spool/cwl"
599             runtimeContext.docker_outdir = "/var/spool/cwl"
600             runtimeContext.tmpdir = "/tmp"
601             runtimeContext.docker_tmpdir = "/tmp"
602         elif self.work_api == "jobs":
603             if runtimeContext.priority != DEFAULT_PRIORITY:
604                 raise Exception("--priority not implemented for jobs API.")
605             runtimeContext.outdir = "$(task.outdir)"
606             runtimeContext.docker_outdir = "$(task.outdir)"
607             runtimeContext.tmpdir = "$(task.tmpdir)"
608
609         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
610             raise Exception("--priority must be in the range 1..1000.")
611
612         if self.should_estimate_cache_size:
613             visited = set()
614             estimated_size = [0]
615             def estimate_collection_cache(obj):
616                 if obj.get("location", "").startswith("keep:"):
617                     m = pdh_size.match(obj["location"][5:])
618                     if m and m.group(1) not in visited:
619                         visited.add(m.group(1))
620                         estimated_size[0] += int(m.group(2))
621             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
622             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
623             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
624
625         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
626
627         runnerjob = None
628         if runtimeContext.submit:
629             # Submit a runner job to run the workflow for us.
630             if self.work_api == "containers":
631                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
632                     runtimeContext.runnerjob = tool.tool["id"]
633                 else:
634                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
635                                                 self.output_name,
636                                                 self.output_tags,
637                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
638                                                 name=runtimeContext.name,
639                                                 on_error=runtimeContext.on_error,
640                                                 submit_runner_image=runtimeContext.submit_runner_image,
641                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
642                                                 merged_map=merged_map,
643                                                 priority=runtimeContext.priority,
644                                                 secret_store=self.secret_store,
645                                                 collection_cache_size=runtimeContext.collection_cache_size,
646                                                 collection_cache_is_default=self.should_estimate_cache_size)
647             elif self.work_api == "jobs":
648                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
649                                       self.output_name,
650                                       self.output_tags,
651                                       submit_runner_ram=runtimeContext.submit_runner_ram,
652                                       name=runtimeContext.name,
653                                       on_error=runtimeContext.on_error,
654                                       submit_runner_image=runtimeContext.submit_runner_image,
655                                       merged_map=merged_map)
656         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
657             # Create pipeline for local run
658             self.pipeline = self.api.pipeline_instances().create(
659                 body={
660                     "owner_uuid": self.project_uuid,
661                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
662                     "components": {},
663                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
664             logger.info("Pipeline instance %s", self.pipeline["uuid"])
665
666         if runtimeContext.cwl_runner_job is not None:
667             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
668
669         jobiter = tool.job(job_order,
670                            self.output_callback,
671                            runtimeContext)
672
673         if runtimeContext.submit and not runtimeContext.wait:
674             runnerjob = next(jobiter)
675             runnerjob.run(runtimeContext)
676             return (runnerjob.uuid, "success")
677
678         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
679         if current_container:
680             logger.info("Running inside container %s", current_container.get("uuid"))
681
682         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
683         self.polling_thread = threading.Thread(target=self.poll_states)
684         self.polling_thread.start()
685
686         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
687
688         try:
689             self.workflow_eval_lock.acquire()
690
691             # Holds the lock while this code runs and releases it when
692             # it is safe to do so in self.workflow_eval_lock.wait(),
693             # at which point on_message can update job state and
694             # process output callbacks.
695
696             loopperf = Perf(metrics, "jobiter")
697             loopperf.__enter__()
698             for runnable in jobiter:
699                 loopperf.__exit__()
700
701                 if self.stop_polling.is_set():
702                     break
703
704                 if self.task_queue.error is not None:
705                     raise self.task_queue.error
706
707                 if runnable:
708                     with Perf(metrics, "run"):
709                         self.start_run(runnable, runtimeContext)
710                 else:
711                     if (self.task_queue.in_flight + len(self.processes)) > 0:
712                         self.workflow_eval_lock.wait(3)
713                     else:
714                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
715                         break
716
717                 if self.stop_polling.is_set():
718                     break
719
720                 loopperf.__enter__()
721             loopperf.__exit__()
722
723             while (self.task_queue.in_flight + len(self.processes)) > 0:
724                 if self.task_queue.error is not None:
725                     raise self.task_queue.error
726                 self.workflow_eval_lock.wait(3)
727
728         except UnsupportedRequirement:
729             raise
730         except:
731             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
732                 logger.error("Interrupted, workflow will be cancelled")
733             else:
734                 logger.error("Execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
735             if self.pipeline:
736                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
737                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
738             if runtimeContext.submit and isinstance(tool, Runner):
739                 runnerjob = tool
740                 if runnerjob.uuid and self.work_api == "containers":
741                     self.api.container_requests().update(uuid=runnerjob.uuid,
742                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
743         finally:
744             self.workflow_eval_lock.release()
745             self.task_queue.drain()
746             self.stop_polling.set()
747             self.polling_thread.join()
748             self.task_queue.join()
749
750         if self.final_status == "UnsupportedRequirement":
751             raise UnsupportedRequirement("Check log for details.")
752
753         if self.final_output is None:
754             raise WorkflowException("Workflow did not return a result.")
755
756         if runtimeContext.submit and isinstance(tool, Runner):
757             logger.info("Final output collection %s", tool.final_output)
758         else:
759             if self.output_name is None:
760                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
761             if self.output_tags is None:
762                 self.output_tags = ""
763
764             storage_classes = runtimeContext.storage_classes.strip().split(",")
765             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
766             self.set_crunch_output()
767
768         if runtimeContext.compute_checksum:
769             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
770             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
771
772         if self.trash_intermediate and self.final_status == "success":
773             self.trash_intermediate_output()
774
775         return (self.final_output, self.final_status)