eeb44dbd7f232193b6e7102f85e3ae905b71dfc9
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .arvjob import RunnerJob, RunnerTemplate
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from .task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
47 from cwltool.command_line_tool import compute_checksums
48
49 logger = logging.getLogger('arvados.cwl-runner')
50 metrics = logging.getLogger('arvados.cwl-runner.metrics')
51
52 DEFAULT_PRIORITY = 500
53
54 class RuntimeStatusLoggingHandler(logging.Handler):
55     """
56     Intercepts logging calls and report them as runtime statuses on runner
57     containers.
58     """
59     def __init__(self, runtime_status_update_func):
60         super(RuntimeStatusLoggingHandler, self).__init__()
61         self.runtime_status_update = runtime_status_update_func
62         self.updatingRuntimeStatus = False
63
64     def emit(self, record):
65         kind = None
66         if record.levelno >= logging.ERROR:
67             kind = 'error'
68         elif record.levelno >= logging.WARNING:
69             kind = 'warning'
70         if kind is not None and self.updatingRuntimeStatus is not True:
71             self.updatingRuntimeStatus = True
72             try:
73                 log_msg = record.getMessage()
74                 if '\n' in log_msg:
75                     # If the logged message is multi-line, use its first line as status
76                     # and the rest as detail.
77                     status, detail = log_msg.split('\n', 1)
78                     self.runtime_status_update(
79                         kind,
80                         "%s: %s" % (record.name, status),
81                         detail
82                     )
83                 else:
84                     self.runtime_status_update(
85                         kind,
86                         "%s: %s" % (record.name, record.getMessage())
87                     )
88             finally:
89                 self.updatingRuntimeStatus = False
90
91
92 class ArvCwlExecutor(object):
93     """Execute a CWL tool or workflow, submit work (using either jobs or
94     containers API), wait for them to complete, and report output.
95
96     """
97
98     def __init__(self, api_client,
99                  arvargs=None,
100                  keep_client=None,
101                  num_retries=4,
102                  thread_count=4):
103
104         if arvargs is None:
105             arvargs = argparse.Namespace()
106             arvargs.work_api = None
107             arvargs.output_name = None
108             arvargs.output_tags = None
109             arvargs.thread_count = 1
110             arvargs.collection_cache_size = None
111
112         self.api = api_client
113         self.processes = {}
114         self.workflow_eval_lock = threading.Condition(threading.RLock())
115         self.final_output = None
116         self.final_status = None
117         self.num_retries = num_retries
118         self.uuid = None
119         self.stop_polling = threading.Event()
120         self.poll_api = None
121         self.pipeline = None
122         self.final_output_collection = None
123         self.output_name = arvargs.output_name
124         self.output_tags = arvargs.output_tags
125         self.project_uuid = None
126         self.intermediate_output_ttl = 0
127         self.intermediate_output_collections = []
128         self.trash_intermediate = False
129         self.thread_count = arvargs.thread_count
130         self.poll_interval = 12
131         self.loadingContext = None
132         self.should_estimate_cache_size = True
133
134         if keep_client is not None:
135             self.keep_client = keep_client
136         else:
137             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
138
139         if arvargs.collection_cache_size:
140             collection_cache_size = arvargs.collection_cache_size*1024*1024
141             self.should_estimate_cache_size = False
142         else:
143             collection_cache_size = 256*1024*1024
144
145         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
146                                                 cap=collection_cache_size)
147
148         self.fetcher_constructor = partial(CollectionFetcher,
149                                            api_client=self.api,
150                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
151                                            num_retries=self.num_retries)
152
153         self.work_api = None
154         expected_api = ["jobs", "containers"]
155         for api in expected_api:
156             try:
157                 methods = self.api._rootDesc.get('resources')[api]['methods']
158                 if ('httpMethod' in methods['create'] and
159                     (arvargs.work_api == api or arvargs.work_api is None)):
160                     self.work_api = api
161                     break
162             except KeyError:
163                 pass
164
165         if not self.work_api:
166             if arvargs.work_api is None:
167                 raise Exception("No supported APIs")
168             else:
169                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
170
171         if self.work_api == "jobs":
172             logger.warning("""
173 *******************************
174 Using the deprecated 'jobs' API.
175
176 To get rid of this warning:
177
178 Users: read about migrating at
179 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
180 and use the option --api=containers
181
182 Admins: configure the cluster to disable the 'jobs' API as described at:
183 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
184 *******************************""")
185
186         self.loadingContext = ArvLoadingContext(vars(arvargs))
187         self.loadingContext.fetcher_constructor = self.fetcher_constructor
188         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
189         self.loadingContext.construct_tool_object = self.arv_make_tool
190
191         # Add a custom logging handler to the root logger for runtime status reporting
192         # if running inside a container
193         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
194             root_logger = logging.getLogger('')
195
196             # Remove existing RuntimeStatusLoggingHandlers if they exist
197             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
198             root_logger.handlers = handlers
199
200             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
201             root_logger.addHandler(handler)
202
203         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
204         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
205                                                      collection_cache=self.collection_cache)
206
207         validate_cluster_target(self, self.runtimeContext)
208
209
210     def arv_make_tool(self, toolpath_object, loadingContext):
211         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
212             return ArvadosCommandTool(self, toolpath_object, loadingContext)
213         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
214             return ArvadosWorkflow(self, toolpath_object, loadingContext)
215         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
216             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
217         else:
218             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
219
220     def output_callback(self, out, processStatus):
221         with self.workflow_eval_lock:
222             if processStatus == "success":
223                 logger.info("Overall process status is %s", processStatus)
224                 state = "Complete"
225             else:
226                 logger.error("Overall process status is %s", processStatus)
227                 state = "Failed"
228             if self.pipeline:
229                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
230                                                         body={"state": state}).execute(num_retries=self.num_retries)
231             self.final_status = processStatus
232             self.final_output = out
233             self.workflow_eval_lock.notifyAll()
234
235
236     def start_run(self, runnable, runtimeContext):
237         self.task_queue.add(partial(runnable.run, runtimeContext),
238                             self.workflow_eval_lock, self.stop_polling)
239
240     def process_submitted(self, container):
241         with self.workflow_eval_lock:
242             self.processes[container.uuid] = container
243
244     def process_done(self, uuid, record):
245         with self.workflow_eval_lock:
246             j = self.processes[uuid]
247             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
248             self.task_queue.add(partial(j.done, record),
249                                 self.workflow_eval_lock, self.stop_polling)
250             del self.processes[uuid]
251
252     def runtime_status_update(self, kind, message, detail=None):
253         """
254         Updates the runtime_status field on the runner container.
255         Called when there's a need to report errors, warnings or just
256         activity statuses, for example in the RuntimeStatusLoggingHandler.
257         """
258         with self.workflow_eval_lock:
259             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
260             if current is None:
261                 return
262             runtime_status = current.get('runtime_status', {})
263             # In case of status being an error, only report the first one.
264             if kind == 'error':
265                 if not runtime_status.get('error'):
266                     runtime_status.update({
267                         'error': message
268                     })
269                     if detail is not None:
270                         runtime_status.update({
271                             'errorDetail': detail
272                         })
273                 # Further errors are only mentioned as a count.
274                 else:
275                     # Get anything before an optional 'and N more' string.
276                     try:
277                         error_msg = re.match(
278                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
279                         more_failures = re.match(
280                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
281                     except TypeError:
282                         # Ignore tests stubbing errors
283                         return
284                     if more_failures:
285                         failure_qty = int(more_failures.groups()[0])
286                         runtime_status.update({
287                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
288                         })
289                     else:
290                         runtime_status.update({
291                             'error': "%s (and 1 more)" % error_msg
292                         })
293             elif kind in ['warning', 'activity']:
294                 # Record the last warning/activity status without regard of
295                 # previous occurences.
296                 runtime_status.update({
297                     kind: message
298                 })
299                 if detail is not None:
300                     runtime_status.update({
301                         kind+"Detail": detail
302                     })
303             else:
304                 # Ignore any other status kind
305                 return
306             try:
307                 self.api.containers().update(uuid=current['uuid'],
308                                             body={
309                                                 'runtime_status': runtime_status,
310                                             }).execute(num_retries=self.num_retries)
311             except Exception as e:
312                 logger.info("Couldn't update runtime_status: %s", e)
313
314     def wrapped_callback(self, cb, obj, st):
315         with self.workflow_eval_lock:
316             cb(obj, st)
317             self.workflow_eval_lock.notifyAll()
318
319     def get_wrapped_callback(self, cb):
320         return partial(self.wrapped_callback, cb)
321
322     def on_message(self, event):
323         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
324             uuid = event["object_uuid"]
325             if event["properties"]["new_attributes"]["state"] == "Running":
326                 with self.workflow_eval_lock:
327                     j = self.processes[uuid]
328                     if j.running is False:
329                         j.running = True
330                         j.update_pipeline_component(event["properties"]["new_attributes"])
331                         logger.info("%s %s is Running", self.label(j), uuid)
332             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
333                 self.process_done(uuid, event["properties"]["new_attributes"])
334
335     def label(self, obj):
336         return "[%s %s]" % (self.work_api[0:-1], obj.name)
337
338     def poll_states(self):
339         """Poll status of jobs or containers listed in the processes dict.
340
341         Runs in a separate thread.
342         """
343
344         try:
345             remain_wait = self.poll_interval
346             while True:
347                 if remain_wait > 0:
348                     self.stop_polling.wait(remain_wait)
349                 if self.stop_polling.is_set():
350                     break
351                 with self.workflow_eval_lock:
352                     keys = list(self.processes)
353                 if not keys:
354                     remain_wait = self.poll_interval
355                     continue
356
357                 begin_poll = time.time()
358                 if self.work_api == "containers":
359                     table = self.poll_api.container_requests()
360                 elif self.work_api == "jobs":
361                     table = self.poll_api.jobs()
362
363                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
364
365                 while keys:
366                     page = keys[:pageSize]
367                     keys = keys[pageSize:]
368                     try:
369                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
370                     except Exception:
371                         logger.exception("Error checking states on API server: %s")
372                         remain_wait = self.poll_interval
373                         continue
374
375                     for p in proc_states["items"]:
376                         self.on_message({
377                             "object_uuid": p["uuid"],
378                             "event_type": "update",
379                             "properties": {
380                                 "new_attributes": p
381                             }
382                         })
383                 finish_poll = time.time()
384                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
385         except:
386             logger.exception("Fatal error in state polling thread.")
387             with self.workflow_eval_lock:
388                 self.processes.clear()
389                 self.workflow_eval_lock.notifyAll()
390         finally:
391             self.stop_polling.set()
392
393     def add_intermediate_output(self, uuid):
394         if uuid:
395             self.intermediate_output_collections.append(uuid)
396
397     def trash_intermediate_output(self):
398         logger.info("Cleaning up intermediate output collections")
399         for i in self.intermediate_output_collections:
400             try:
401                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
402             except Exception:
403                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
404             except (KeyboardInterrupt, SystemExit):
405                 break
406
407     def check_features(self, obj):
408         if isinstance(obj, dict):
409             if obj.get("writable") and self.work_api != "containers":
410                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
411             if obj.get("class") == "DockerRequirement":
412                 if obj.get("dockerOutputDirectory"):
413                     if self.work_api != "containers":
414                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
415                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
416                     if not obj.get("dockerOutputDirectory").startswith('/'):
417                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
418                             "Option 'dockerOutputDirectory' must be an absolute path.")
419             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
420                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
421             for v in viewvalues(obj):
422                 self.check_features(v)
423         elif isinstance(obj, list):
424             for i,v in enumerate(obj):
425                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
426                     self.check_features(v)
427
428     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
429         outputObj = copy.deepcopy(outputObj)
430
431         files = []
432         def capture(fileobj):
433             files.append(fileobj)
434
435         adjustDirObjs(outputObj, capture)
436         adjustFileObjs(outputObj, capture)
437
438         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
439
440         final = arvados.collection.Collection(api_client=self.api,
441                                               keep_client=self.keep_client,
442                                               num_retries=self.num_retries)
443
444         for k,v in generatemapper.items():
445             if v.type == "Directory" and v.resolved.startswith("_:"):
446                     continue
447             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
448                 with final.open(v.target, "wb") as f:
449                     f.write(v.resolved.encode("utf-8"))
450                     continue
451
452             if not v.resolved.startswith("keep:"):
453                 raise Exception("Output source is not in keep or a literal")
454             sp = v.resolved.split("/")
455             srccollection = sp[0][5:]
456             try:
457                 reader = self.collection_cache.get(srccollection)
458                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
459                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
460             except arvados.errors.ArgumentError as e:
461                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
462                 raise
463             except IOError as e:
464                 logger.error("While preparing output collection: %s", e)
465                 raise
466
467         def rewrite(fileobj):
468             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
469             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
470                 if k in fileobj:
471                     del fileobj[k]
472
473         adjustDirObjs(outputObj, rewrite)
474         adjustFileObjs(outputObj, rewrite)
475
476         with final.open("cwl.output.json", "w") as f:
477             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
478             f.write(res)
479
480         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
481
482         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
483                     final.api_response()["name"],
484                     final.manifest_locator())
485
486         final_uuid = final.manifest_locator()
487         tags = tagsString.split(',')
488         for tag in tags:
489              self.api.links().create(body={
490                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
491                 }).execute(num_retries=self.num_retries)
492
493         def finalcollection(fileobj):
494             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
495
496         adjustDirObjs(outputObj, finalcollection)
497         adjustFileObjs(outputObj, finalcollection)
498
499         return (outputObj, final)
500
501     def set_crunch_output(self):
502         if self.work_api == "containers":
503             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
504             if current is None:
505                 return
506             try:
507                 self.api.containers().update(uuid=current['uuid'],
508                                              body={
509                                                  'output': self.final_output_collection.portable_data_hash(),
510                                              }).execute(num_retries=self.num_retries)
511                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
512                                               body={
513                                                   'is_trashed': True
514                                               }).execute(num_retries=self.num_retries)
515             except Exception:
516                 logger.exception("Setting container output")
517                 return
518         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
519             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
520                                    body={
521                                        'output': self.final_output_collection.portable_data_hash(),
522                                        'success': self.final_status == "success",
523                                        'progress':1.0
524                                    }).execute(num_retries=self.num_retries)
525
526     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
527         self.debug = runtimeContext.debug
528
529         tool.visit(self.check_features)
530
531         self.project_uuid = runtimeContext.project_uuid
532         self.pipeline = None
533         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
534         self.secret_store = runtimeContext.secret_store
535
536         self.trash_intermediate = runtimeContext.trash_intermediate
537         if self.trash_intermediate and self.work_api != "containers":
538             raise Exception("--trash-intermediate is only supported with --api=containers.")
539
540         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
541         if self.intermediate_output_ttl and self.work_api != "containers":
542             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
543         if self.intermediate_output_ttl < 0:
544             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
545
546         if runtimeContext.submit_request_uuid and self.work_api != "containers":
547             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
548
549         if not runtimeContext.name:
550             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
551
552         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
553         # Also uploads docker images.
554         merged_map = upload_workflow_deps(self, tool)
555
556         # Reload tool object which may have been updated by
557         # upload_workflow_deps
558         # Don't validate this time because it will just print redundant errors.
559         loadingContext = self.loadingContext.copy()
560         loadingContext.loader = tool.doc_loader
561         loadingContext.avsc_names = tool.doc_schema
562         loadingContext.metadata = tool.metadata
563         loadingContext.do_validate = False
564
565         tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
566                                   loadingContext)
567
568         # Upload local file references in the job order.
569         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
570                                      tool, job_order)
571
572         existing_uuid = runtimeContext.update_workflow
573         if existing_uuid or runtimeContext.create_workflow:
574             # Create a pipeline template or workflow record and exit.
575             if self.work_api == "jobs":
576                 tmpl = RunnerTemplate(self, tool, job_order,
577                                       runtimeContext.enable_reuse,
578                                       uuid=existing_uuid,
579                                       submit_runner_ram=runtimeContext.submit_runner_ram,
580                                       name=runtimeContext.name,
581                                       merged_map=merged_map,
582                                       loadingContext=loadingContext)
583                 tmpl.save()
584                 # cwltool.main will write our return value to stdout.
585                 return (tmpl.uuid, "success")
586             elif self.work_api == "containers":
587                 return (upload_workflow(self, tool, job_order,
588                                         self.project_uuid,
589                                         uuid=existing_uuid,
590                                         submit_runner_ram=runtimeContext.submit_runner_ram,
591                                         name=runtimeContext.name,
592                                         merged_map=merged_map),
593                         "success")
594
595         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
596         self.eval_timeout = runtimeContext.eval_timeout
597
598         runtimeContext = runtimeContext.copy()
599         runtimeContext.use_container = True
600         runtimeContext.tmpdir_prefix = "tmp"
601         runtimeContext.work_api = self.work_api
602
603         if self.work_api == "containers":
604             if self.ignore_docker_for_reuse:
605                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
606             runtimeContext.outdir = "/var/spool/cwl"
607             runtimeContext.docker_outdir = "/var/spool/cwl"
608             runtimeContext.tmpdir = "/tmp"
609             runtimeContext.docker_tmpdir = "/tmp"
610         elif self.work_api == "jobs":
611             if runtimeContext.priority != DEFAULT_PRIORITY:
612                 raise Exception("--priority not implemented for jobs API.")
613             runtimeContext.outdir = "$(task.outdir)"
614             runtimeContext.docker_outdir = "$(task.outdir)"
615             runtimeContext.tmpdir = "$(task.tmpdir)"
616
617         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
618             raise Exception("--priority must be in the range 1..1000.")
619
620         if self.should_estimate_cache_size:
621             visited = set()
622             estimated_size = [0]
623             def estimate_collection_cache(obj):
624                 if obj.get("location", "").startswith("keep:"):
625                     m = pdh_size.match(obj["location"][5:])
626                     if m and m.group(1) not in visited:
627                         visited.add(m.group(1))
628                         estimated_size[0] += int(m.group(2))
629             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
630             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
631             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
632
633         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
634
635         runnerjob = None
636         if runtimeContext.submit:
637             # Submit a runner job to run the workflow for us.
638             if self.work_api == "containers":
639                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
640                     runtimeContext.runnerjob = tool.tool["id"]
641                 else:
642                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
643                                                 self.output_name,
644                                                 self.output_tags,
645                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
646                                                 name=runtimeContext.name,
647                                                 on_error=runtimeContext.on_error,
648                                                 submit_runner_image=runtimeContext.submit_runner_image,
649                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
650                                                 merged_map=merged_map,
651                                                 priority=runtimeContext.priority,
652                                                 secret_store=self.secret_store,
653                                                 collection_cache_size=runtimeContext.collection_cache_size,
654                                                 collection_cache_is_default=self.should_estimate_cache_size)
655             elif self.work_api == "jobs":
656                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
657                                       self.output_name,
658                                       self.output_tags,
659                                       submit_runner_ram=runtimeContext.submit_runner_ram,
660                                       name=runtimeContext.name,
661                                       on_error=runtimeContext.on_error,
662                                       submit_runner_image=runtimeContext.submit_runner_image,
663                                       merged_map=merged_map)
664         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
665             # Create pipeline for local run
666             self.pipeline = self.api.pipeline_instances().create(
667                 body={
668                     "owner_uuid": self.project_uuid,
669                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
670                     "components": {},
671                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
672             logger.info("Pipeline instance %s", self.pipeline["uuid"])
673
674         if runtimeContext.cwl_runner_job is not None:
675             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
676
677         jobiter = tool.job(job_order,
678                            self.output_callback,
679                            runtimeContext)
680
681         if runtimeContext.submit and not runtimeContext.wait:
682             runnerjob = next(jobiter)
683             runnerjob.run(runtimeContext)
684             return (runnerjob.uuid, "success")
685
686         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
687         if current_container:
688             logger.info("Running inside container %s", current_container.get("uuid"))
689
690         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
691         self.polling_thread = threading.Thread(target=self.poll_states)
692         self.polling_thread.start()
693
694         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
695
696         try:
697             self.workflow_eval_lock.acquire()
698
699             # Holds the lock while this code runs and releases it when
700             # it is safe to do so in self.workflow_eval_lock.wait(),
701             # at which point on_message can update job state and
702             # process output callbacks.
703
704             loopperf = Perf(metrics, "jobiter")
705             loopperf.__enter__()
706             for runnable in jobiter:
707                 loopperf.__exit__()
708
709                 if self.stop_polling.is_set():
710                     break
711
712                 if self.task_queue.error is not None:
713                     raise self.task_queue.error
714
715                 if runnable:
716                     with Perf(metrics, "run"):
717                         self.start_run(runnable, runtimeContext)
718                 else:
719                     if (self.task_queue.in_flight + len(self.processes)) > 0:
720                         self.workflow_eval_lock.wait(3)
721                     else:
722                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
723                         break
724
725                 if self.stop_polling.is_set():
726                     break
727
728                 loopperf.__enter__()
729             loopperf.__exit__()
730
731             while (self.task_queue.in_flight + len(self.processes)) > 0:
732                 if self.task_queue.error is not None:
733                     raise self.task_queue.error
734                 self.workflow_eval_lock.wait(3)
735
736         except UnsupportedRequirement:
737             raise
738         except:
739             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
740                 logger.error("Interrupted, workflow will be cancelled")
741             elif isinstance(sys.exc_info()[1], WorkflowException):
742                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
743             else:
744                 logger.exception("Workflow execution failed")
745
746             if self.pipeline:
747                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
748                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
749             if runtimeContext.submit and isinstance(tool, Runner):
750                 runnerjob = tool
751                 if runnerjob.uuid and self.work_api == "containers":
752                     self.api.container_requests().update(uuid=runnerjob.uuid,
753                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
754         finally:
755             self.workflow_eval_lock.release()
756             self.task_queue.drain()
757             self.stop_polling.set()
758             self.polling_thread.join()
759             self.task_queue.join()
760
761         if self.final_status == "UnsupportedRequirement":
762             raise UnsupportedRequirement("Check log for details.")
763
764         if self.final_output is None:
765             raise WorkflowException("Workflow did not return a result.")
766
767         if runtimeContext.submit and isinstance(tool, Runner):
768             logger.info("Final output collection %s", tool.final_output)
769         else:
770             if self.output_name is None:
771                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
772             if self.output_tags is None:
773                 self.output_tags = ""
774
775             storage_classes = runtimeContext.storage_classes.strip().split(",")
776             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
777             self.set_crunch_output()
778
779         if runtimeContext.compute_checksum:
780             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
781             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
782
783         if self.trash_intermediate and self.final_status == "success":
784             self.trash_intermediate_output()
785
786         return (self.final_output, self.final_status)