Merge branch '9865-cwl-fix-ignored-exceptions'
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .arvjob import RunnerJob, RunnerTemplate
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from .task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
47 from cwltool.command_line_tool import compute_checksums
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62
63     def emit(self, record):
64         kind = None
65         if record.levelno >= logging.ERROR:
66             kind = 'error'
67         elif record.levelno >= logging.WARNING:
68             kind = 'warning'
69         if kind is not None:
70             log_msg = record.getMessage()
71             if '\n' in log_msg:
72                 # If the logged message is multi-line, use its first line as status
73                 # and the rest as detail.
74                 status, detail = log_msg.split('\n', 1)
75                 self.runtime_status_update(
76                     kind,
77                     "%s: %s" % (record.name, status),
78                     detail
79                 )
80             else:
81                 self.runtime_status_update(
82                     kind,
83                     "%s: %s" % (record.name, record.getMessage())
84                 )
85
86 class ArvCwlExecutor(object):
87     """Execute a CWL tool or workflow, submit work (using either jobs or
88     containers API), wait for them to complete, and report output.
89
90     """
91
92     def __init__(self, api_client,
93                  arvargs=None,
94                  keep_client=None,
95                  num_retries=4,
96                  thread_count=4):
97
98         if arvargs is None:
99             arvargs = argparse.Namespace()
100             arvargs.work_api = None
101             arvargs.output_name = None
102             arvargs.output_tags = None
103             arvargs.thread_count = 1
104             arvargs.collection_cache_size = None
105
106         self.api = api_client
107         self.processes = {}
108         self.workflow_eval_lock = threading.Condition(threading.RLock())
109         self.final_output = None
110         self.final_status = None
111         self.num_retries = num_retries
112         self.uuid = None
113         self.stop_polling = threading.Event()
114         self.poll_api = None
115         self.pipeline = None
116         self.final_output_collection = None
117         self.output_name = arvargs.output_name
118         self.output_tags = arvargs.output_tags
119         self.project_uuid = None
120         self.intermediate_output_ttl = 0
121         self.intermediate_output_collections = []
122         self.trash_intermediate = False
123         self.thread_count = arvargs.thread_count
124         self.poll_interval = 12
125         self.loadingContext = None
126         self.should_estimate_cache_size = True
127
128         if keep_client is not None:
129             self.keep_client = keep_client
130         else:
131             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
132
133         if arvargs.collection_cache_size:
134             collection_cache_size = arvargs.collection_cache_size*1024*1024
135             self.should_estimate_cache_size = False
136         else:
137             collection_cache_size = 256*1024*1024
138
139         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
140                                                 cap=collection_cache_size)
141
142         self.fetcher_constructor = partial(CollectionFetcher,
143                                            api_client=self.api,
144                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
145                                            num_retries=self.num_retries)
146
147         self.work_api = None
148         expected_api = ["jobs", "containers"]
149         for api in expected_api:
150             try:
151                 methods = self.api._rootDesc.get('resources')[api]['methods']
152                 if ('httpMethod' in methods['create'] and
153                     (arvargs.work_api == api or arvargs.work_api is None)):
154                     self.work_api = api
155                     break
156             except KeyError:
157                 pass
158
159         if not self.work_api:
160             if arvargs.work_api is None:
161                 raise Exception("No supported APIs")
162             else:
163                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
164
165         if self.work_api == "jobs":
166             logger.warning("""
167 *******************************
168 Using the deprecated 'jobs' API.
169
170 To get rid of this warning:
171
172 Users: read about migrating at
173 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
174 and use the option --api=containers
175
176 Admins: configure the cluster to disable the 'jobs' API as described at:
177 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
178 *******************************""")
179
180         self.loadingContext = ArvLoadingContext(vars(arvargs))
181         self.loadingContext.fetcher_constructor = self.fetcher_constructor
182         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
183         self.loadingContext.construct_tool_object = self.arv_make_tool
184
185         # Add a custom logging handler to the root logger for runtime status reporting
186         # if running inside a container
187         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
188             root_logger = logging.getLogger('')
189
190             # Remove existing RuntimeStatusLoggingHandlers if they exist
191             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
192             root_logger.handlers = handlers
193
194             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
195             root_logger.addHandler(handler)
196
197         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
198         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
199                                                      collection_cache=self.collection_cache)
200
201         validate_cluster_target(self, self.runtimeContext)
202
203
204     def arv_make_tool(self, toolpath_object, loadingContext):
205         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
206             return ArvadosCommandTool(self, toolpath_object, loadingContext)
207         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
208             return ArvadosWorkflow(self, toolpath_object, loadingContext)
209         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
210             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
211         else:
212             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
213
214     def output_callback(self, out, processStatus):
215         with self.workflow_eval_lock:
216             if processStatus == "success":
217                 logger.info("Overall process status is %s", processStatus)
218                 state = "Complete"
219             else:
220                 logger.error("Overall process status is %s", processStatus)
221                 state = "Failed"
222             if self.pipeline:
223                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
224                                                         body={"state": state}).execute(num_retries=self.num_retries)
225             self.final_status = processStatus
226             self.final_output = out
227             self.workflow_eval_lock.notifyAll()
228
229
230     def start_run(self, runnable, runtimeContext):
231         self.task_queue.add(partial(runnable.run, runtimeContext),
232                             self.workflow_eval_lock, self.stop_polling)
233
234     def process_submitted(self, container):
235         with self.workflow_eval_lock:
236             self.processes[container.uuid] = container
237
238     def process_done(self, uuid, record):
239         with self.workflow_eval_lock:
240             j = self.processes[uuid]
241             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
242             self.task_queue.add(partial(j.done, record),
243                                 self.workflow_eval_lock, self.stop_polling)
244             del self.processes[uuid]
245
246     def runtime_status_update(self, kind, message, detail=None):
247         """
248         Updates the runtime_status field on the runner container.
249         Called when there's a need to report errors, warnings or just
250         activity statuses, for example in the RuntimeStatusLoggingHandler.
251         """
252         with self.workflow_eval_lock:
253             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
254             if current is None:
255                 return
256             runtime_status = current.get('runtime_status', {})
257             # In case of status being an error, only report the first one.
258             if kind == 'error':
259                 if not runtime_status.get('error'):
260                     runtime_status.update({
261                         'error': message
262                     })
263                     if detail is not None:
264                         runtime_status.update({
265                             'errorDetail': detail
266                         })
267                 # Further errors are only mentioned as a count.
268                 else:
269                     # Get anything before an optional 'and N more' string.
270                     try:
271                         error_msg = re.match(
272                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
273                         more_failures = re.match(
274                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
275                     except TypeError:
276                         # Ignore tests stubbing errors
277                         return
278                     if more_failures:
279                         failure_qty = int(more_failures.groups()[0])
280                         runtime_status.update({
281                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
282                         })
283                     else:
284                         runtime_status.update({
285                             'error': "%s (and 1 more)" % error_msg
286                         })
287             elif kind in ['warning', 'activity']:
288                 # Record the last warning/activity status without regard of
289                 # previous occurences.
290                 runtime_status.update({
291                     kind: message
292                 })
293                 if detail is not None:
294                     runtime_status.update({
295                         kind+"Detail": detail
296                     })
297             else:
298                 # Ignore any other status kind
299                 return
300             try:
301                 self.api.containers().update(uuid=current['uuid'],
302                                             body={
303                                                 'runtime_status': runtime_status,
304                                             }).execute(num_retries=self.num_retries)
305             except Exception as e:
306                 logger.info("Couldn't update runtime_status: %s", e)
307
308     def wrapped_callback(self, cb, obj, st):
309         with self.workflow_eval_lock:
310             cb(obj, st)
311             self.workflow_eval_lock.notifyAll()
312
313     def get_wrapped_callback(self, cb):
314         return partial(self.wrapped_callback, cb)
315
316     def on_message(self, event):
317         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
318             uuid = event["object_uuid"]
319             if event["properties"]["new_attributes"]["state"] == "Running":
320                 with self.workflow_eval_lock:
321                     j = self.processes[uuid]
322                     if j.running is False:
323                         j.running = True
324                         j.update_pipeline_component(event["properties"]["new_attributes"])
325                         logger.info("%s %s is Running", self.label(j), uuid)
326             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
327                 self.process_done(uuid, event["properties"]["new_attributes"])
328
329     def label(self, obj):
330         return "[%s %s]" % (self.work_api[0:-1], obj.name)
331
332     def poll_states(self):
333         """Poll status of jobs or containers listed in the processes dict.
334
335         Runs in a separate thread.
336         """
337
338         try:
339             remain_wait = self.poll_interval
340             while True:
341                 if remain_wait > 0:
342                     self.stop_polling.wait(remain_wait)
343                 if self.stop_polling.is_set():
344                     break
345                 with self.workflow_eval_lock:
346                     keys = list(self.processes)
347                 if not keys:
348                     remain_wait = self.poll_interval
349                     continue
350
351                 begin_poll = time.time()
352                 if self.work_api == "containers":
353                     table = self.poll_api.container_requests()
354                 elif self.work_api == "jobs":
355                     table = self.poll_api.jobs()
356
357                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
358
359                 while keys:
360                     page = keys[:pageSize]
361                     keys = keys[pageSize:]
362                     try:
363                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
364                     except Exception:
365                         logger.exception("Error checking states on API server: %s")
366                         remain_wait = self.poll_interval
367                         continue
368
369                     for p in proc_states["items"]:
370                         self.on_message({
371                             "object_uuid": p["uuid"],
372                             "event_type": "update",
373                             "properties": {
374                                 "new_attributes": p
375                             }
376                         })
377                 finish_poll = time.time()
378                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
379         except:
380             logger.exception("Fatal error in state polling thread.")
381             with self.workflow_eval_lock:
382                 self.processes.clear()
383                 self.workflow_eval_lock.notifyAll()
384         finally:
385             self.stop_polling.set()
386
387     def add_intermediate_output(self, uuid):
388         if uuid:
389             self.intermediate_output_collections.append(uuid)
390
391     def trash_intermediate_output(self):
392         logger.info("Cleaning up intermediate output collections")
393         for i in self.intermediate_output_collections:
394             try:
395                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
396             except Exception:
397                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
398             except (KeyboardInterrupt, SystemExit):
399                 break
400
401     def check_features(self, obj):
402         if isinstance(obj, dict):
403             if obj.get("writable") and self.work_api != "containers":
404                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
405             if obj.get("class") == "DockerRequirement":
406                 if obj.get("dockerOutputDirectory"):
407                     if self.work_api != "containers":
408                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
409                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
410                     if not obj.get("dockerOutputDirectory").startswith('/'):
411                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
412                             "Option 'dockerOutputDirectory' must be an absolute path.")
413             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
414                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
415             for v in viewvalues(obj):
416                 self.check_features(v)
417         elif isinstance(obj, list):
418             for i,v in enumerate(obj):
419                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
420                     self.check_features(v)
421
422     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
423         outputObj = copy.deepcopy(outputObj)
424
425         files = []
426         def capture(fileobj):
427             files.append(fileobj)
428
429         adjustDirObjs(outputObj, capture)
430         adjustFileObjs(outputObj, capture)
431
432         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
433
434         final = arvados.collection.Collection(api_client=self.api,
435                                               keep_client=self.keep_client,
436                                               num_retries=self.num_retries)
437
438         for k,v in generatemapper.items():
439             if k.startswith("_:"):
440                 if v.type == "Directory":
441                     continue
442                 if v.type == "CreateFile":
443                     with final.open(v.target, "wb") as f:
444                         f.write(v.resolved.encode("utf-8"))
445                     continue
446
447             if not k.startswith("keep:"):
448                 raise Exception("Output source is not in keep or a literal")
449             sp = k.split("/")
450             srccollection = sp[0][5:]
451             try:
452                 reader = self.collection_cache.get(srccollection)
453                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
454                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
455             except arvados.errors.ArgumentError as e:
456                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
457                 raise
458             except IOError as e:
459                 logger.warning("While preparing output collection: %s", e)
460
461         def rewrite(fileobj):
462             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
463             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
464                 if k in fileobj:
465                     del fileobj[k]
466
467         adjustDirObjs(outputObj, rewrite)
468         adjustFileObjs(outputObj, rewrite)
469
470         with final.open("cwl.output.json", "w") as f:
471             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
472             f.write(res)           
473
474         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
475
476         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
477                     final.api_response()["name"],
478                     final.manifest_locator())
479
480         final_uuid = final.manifest_locator()
481         tags = tagsString.split(',')
482         for tag in tags:
483              self.api.links().create(body={
484                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
485                 }).execute(num_retries=self.num_retries)
486
487         def finalcollection(fileobj):
488             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
489
490         adjustDirObjs(outputObj, finalcollection)
491         adjustFileObjs(outputObj, finalcollection)
492
493         return (outputObj, final)
494
495     def set_crunch_output(self):
496         if self.work_api == "containers":
497             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
498             if current is None:
499                 return
500             try:
501                 self.api.containers().update(uuid=current['uuid'],
502                                              body={
503                                                  'output': self.final_output_collection.portable_data_hash(),
504                                              }).execute(num_retries=self.num_retries)
505                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
506                                               body={
507                                                   'is_trashed': True
508                                               }).execute(num_retries=self.num_retries)
509             except Exception:
510                 logger.exception("Setting container output")
511                 return
512         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
513             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
514                                    body={
515                                        'output': self.final_output_collection.portable_data_hash(),
516                                        'success': self.final_status == "success",
517                                        'progress':1.0
518                                    }).execute(num_retries=self.num_retries)
519
520     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
521         self.debug = runtimeContext.debug
522
523         tool.visit(self.check_features)
524
525         self.project_uuid = runtimeContext.project_uuid
526         self.pipeline = None
527         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
528         self.secret_store = runtimeContext.secret_store
529
530         self.trash_intermediate = runtimeContext.trash_intermediate
531         if self.trash_intermediate and self.work_api != "containers":
532             raise Exception("--trash-intermediate is only supported with --api=containers.")
533
534         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
535         if self.intermediate_output_ttl and self.work_api != "containers":
536             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
537         if self.intermediate_output_ttl < 0:
538             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
539
540         if runtimeContext.submit_request_uuid and self.work_api != "containers":
541             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
542
543         if not runtimeContext.name:
544             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
545
546         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
547         # Also uploads docker images.
548         merged_map = upload_workflow_deps(self, tool)
549
550         # Reload tool object which may have been updated by
551         # upload_workflow_deps
552         # Don't validate this time because it will just print redundant errors.
553         loadingContext = self.loadingContext.copy()
554         loadingContext.loader = tool.doc_loader
555         loadingContext.avsc_names = tool.doc_schema
556         loadingContext.metadata = tool.metadata
557         loadingContext.do_validate = False
558
559         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
560                                   loadingContext)
561
562         # Upload local file references in the job order.
563         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
564                                      tool, job_order)
565
566         existing_uuid = runtimeContext.update_workflow
567         if existing_uuid or runtimeContext.create_workflow:
568             # Create a pipeline template or workflow record and exit.
569             if self.work_api == "jobs":
570                 tmpl = RunnerTemplate(self, tool, job_order,
571                                       runtimeContext.enable_reuse,
572                                       uuid=existing_uuid,
573                                       submit_runner_ram=runtimeContext.submit_runner_ram,
574                                       name=runtimeContext.name,
575                                       merged_map=merged_map,
576                                       loadingContext=loadingContext)
577                 tmpl.save()
578                 # cwltool.main will write our return value to stdout.
579                 return (tmpl.uuid, "success")
580             elif self.work_api == "containers":
581                 return (upload_workflow(self, tool, job_order,
582                                         self.project_uuid,
583                                         uuid=existing_uuid,
584                                         submit_runner_ram=runtimeContext.submit_runner_ram,
585                                         name=runtimeContext.name,
586                                         merged_map=merged_map),
587                         "success")
588
589         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
590         self.eval_timeout = runtimeContext.eval_timeout
591
592         runtimeContext = runtimeContext.copy()
593         runtimeContext.use_container = True
594         runtimeContext.tmpdir_prefix = "tmp"
595         runtimeContext.work_api = self.work_api
596
597         if self.work_api == "containers":
598             if self.ignore_docker_for_reuse:
599                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
600             runtimeContext.outdir = "/var/spool/cwl"
601             runtimeContext.docker_outdir = "/var/spool/cwl"
602             runtimeContext.tmpdir = "/tmp"
603             runtimeContext.docker_tmpdir = "/tmp"
604         elif self.work_api == "jobs":
605             if runtimeContext.priority != DEFAULT_PRIORITY:
606                 raise Exception("--priority not implemented for jobs API.")
607             runtimeContext.outdir = "$(task.outdir)"
608             runtimeContext.docker_outdir = "$(task.outdir)"
609             runtimeContext.tmpdir = "$(task.tmpdir)"
610
611         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
612             raise Exception("--priority must be in the range 1..1000.")
613
614         if self.should_estimate_cache_size:
615             visited = set()
616             estimated_size = [0]
617             def estimate_collection_cache(obj):
618                 if obj.get("location", "").startswith("keep:"):
619                     m = pdh_size.match(obj["location"][5:])
620                     if m and m.group(1) not in visited:
621                         visited.add(m.group(1))
622                         estimated_size[0] += int(m.group(2))
623             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
624             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
625             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
626
627         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
628
629         runnerjob = None
630         if runtimeContext.submit:
631             # Submit a runner job to run the workflow for us.
632             if self.work_api == "containers":
633                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
634                     runtimeContext.runnerjob = tool.tool["id"]
635                 else:
636                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
637                                                 self.output_name,
638                                                 self.output_tags,
639                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
640                                                 name=runtimeContext.name,
641                                                 on_error=runtimeContext.on_error,
642                                                 submit_runner_image=runtimeContext.submit_runner_image,
643                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
644                                                 merged_map=merged_map,
645                                                 priority=runtimeContext.priority,
646                                                 secret_store=self.secret_store,
647                                                 collection_cache_size=runtimeContext.collection_cache_size,
648                                                 collection_cache_is_default=self.should_estimate_cache_size)
649             elif self.work_api == "jobs":
650                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
651                                       self.output_name,
652                                       self.output_tags,
653                                       submit_runner_ram=runtimeContext.submit_runner_ram,
654                                       name=runtimeContext.name,
655                                       on_error=runtimeContext.on_error,
656                                       submit_runner_image=runtimeContext.submit_runner_image,
657                                       merged_map=merged_map)
658         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
659             # Create pipeline for local run
660             self.pipeline = self.api.pipeline_instances().create(
661                 body={
662                     "owner_uuid": self.project_uuid,
663                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
664                     "components": {},
665                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
666             logger.info("Pipeline instance %s", self.pipeline["uuid"])
667
668         if runtimeContext.cwl_runner_job is not None:
669             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
670
671         jobiter = tool.job(job_order,
672                            self.output_callback,
673                            runtimeContext)
674
675         if runtimeContext.submit and not runtimeContext.wait:
676             runnerjob = next(jobiter)
677             runnerjob.run(runtimeContext)
678             return (runnerjob.uuid, "success")
679
680         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
681         if current_container:
682             logger.info("Running inside container %s", current_container.get("uuid"))
683
684         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
685         self.polling_thread = threading.Thread(target=self.poll_states)
686         self.polling_thread.start()
687
688         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
689
690         try:
691             self.workflow_eval_lock.acquire()
692
693             # Holds the lock while this code runs and releases it when
694             # it is safe to do so in self.workflow_eval_lock.wait(),
695             # at which point on_message can update job state and
696             # process output callbacks.
697
698             loopperf = Perf(metrics, "jobiter")
699             loopperf.__enter__()
700             for runnable in jobiter:
701                 loopperf.__exit__()
702
703                 if self.stop_polling.is_set():
704                     break
705
706                 if self.task_queue.error is not None:
707                     raise self.task_queue.error
708
709                 if runnable:
710                     with Perf(metrics, "run"):
711                         self.start_run(runnable, runtimeContext)
712                 else:
713                     if (self.task_queue.in_flight + len(self.processes)) > 0:
714                         self.workflow_eval_lock.wait(3)
715                     else:
716                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
717                         break
718
719                 if self.stop_polling.is_set():
720                     break
721
722                 loopperf.__enter__()
723             loopperf.__exit__()
724
725             while (self.task_queue.in_flight + len(self.processes)) > 0:
726                 if self.task_queue.error is not None:
727                     raise self.task_queue.error
728                 self.workflow_eval_lock.wait(3)
729
730         except UnsupportedRequirement:
731             raise
732         except:
733             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
734                 logger.error("Interrupted, workflow will be cancelled")
735             elif isinstance(sys.exc_info()[1], WorkflowException):
736                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
737             else:
738                 logger.exception("Workflow execution failed")
739
740             if self.pipeline:
741                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
742                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
743             if runtimeContext.submit and isinstance(tool, Runner):
744                 runnerjob = tool
745                 if runnerjob.uuid and self.work_api == "containers":
746                     self.api.container_requests().update(uuid=runnerjob.uuid,
747                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
748         finally:
749             self.workflow_eval_lock.release()
750             self.task_queue.drain()
751             self.stop_polling.set()
752             self.polling_thread.join()
753             self.task_queue.join()
754
755         if self.final_status == "UnsupportedRequirement":
756             raise UnsupportedRequirement("Check log for details.")
757
758         if self.final_output is None:
759             raise WorkflowException("Workflow did not return a result.")
760
761         if runtimeContext.submit and isinstance(tool, Runner):
762             logger.info("Final output collection %s", tool.final_output)
763         else:
764             if self.output_name is None:
765                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
766             if self.output_tags is None:
767                 self.output_tags = ""
768
769             storage_classes = runtimeContext.storage_classes.strip().split(",")
770             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
771             self.set_crunch_output()
772
773         if runtimeContext.compute_checksum:
774             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
775             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
776
777         if self.trash_intermediate and self.final_status == "success":
778             self.trash_intermediate_output()
779
780         return (self.final_output, self.final_status)