15028: More conformance fixes
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import division
6 from builtins import next
7 from builtins import object
8 from builtins import str
9 from future.utils import viewvalues, viewitems
10
11 import argparse
12 import logging
13 import os
14 import sys
15 import threading
16 import copy
17 import json
18 import re
19 from functools import partial
20 import time
21
22 from cwltool.errors import WorkflowException
23 import cwltool.workflow
24 from schema_salad.sourceline import SourceLine
25 import schema_salad.validate as validate
26
27 import arvados
28 import arvados.config
29 from arvados.keep import KeepClient
30 from arvados.errors import ApiError
31
32 import arvados_cwl.util
33 from .arvcontainer import RunnerContainer
34 from .arvjob import RunnerJob, RunnerTemplate
35 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
36 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
37 from .arvworkflow import ArvadosWorkflow, upload_workflow
38 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
39 from .perf import Perf
40 from .pathmapper import NoFollowPathMapper
41 from .task_queue import TaskQueue
42 from .context import ArvLoadingContext, ArvRuntimeContext
43 from ._version import __version__
44
45 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
46 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
47 from cwltool.command_line_tool import compute_checksums
48 from cwltool.load_tool import load_tool
49
50 logger = logging.getLogger('arvados.cwl-runner')
51 metrics = logging.getLogger('arvados.cwl-runner.metrics')
52
53 DEFAULT_PRIORITY = 500
54
55 class RuntimeStatusLoggingHandler(logging.Handler):
56     """
57     Intercepts logging calls and report them as runtime statuses on runner
58     containers.
59     """
60     def __init__(self, runtime_status_update_func):
61         super(RuntimeStatusLoggingHandler, self).__init__()
62         self.runtime_status_update = runtime_status_update_func
63         self.updatingRuntimeStatus = False
64
65     def emit(self, record):
66         kind = None
67         if record.levelno >= logging.ERROR:
68             kind = 'error'
69         elif record.levelno >= logging.WARNING:
70             kind = 'warning'
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using either jobs or
95     containers API), wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4):
104
105         if arvargs is None:
106             arvargs = argparse.Namespace()
107             arvargs.work_api = None
108             arvargs.output_name = None
109             arvargs.output_tags = None
110             arvargs.thread_count = 1
111             arvargs.collection_cache_size = None
112
113         self.api = api_client
114         self.processes = {}
115         self.workflow_eval_lock = threading.Condition(threading.RLock())
116         self.final_output = None
117         self.final_status = None
118         self.num_retries = num_retries
119         self.uuid = None
120         self.stop_polling = threading.Event()
121         self.poll_api = None
122         self.pipeline = None
123         self.final_output_collection = None
124         self.output_name = arvargs.output_name
125         self.output_tags = arvargs.output_tags
126         self.project_uuid = None
127         self.intermediate_output_ttl = 0
128         self.intermediate_output_collections = []
129         self.trash_intermediate = False
130         self.thread_count = arvargs.thread_count
131         self.poll_interval = 12
132         self.loadingContext = None
133         self.should_estimate_cache_size = True
134
135         if keep_client is not None:
136             self.keep_client = keep_client
137         else:
138             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
139
140         if arvargs.collection_cache_size:
141             collection_cache_size = arvargs.collection_cache_size*1024*1024
142             self.should_estimate_cache_size = False
143         else:
144             collection_cache_size = 256*1024*1024
145
146         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
147                                                 cap=collection_cache_size)
148
149         self.fetcher_constructor = partial(CollectionFetcher,
150                                            api_client=self.api,
151                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
152                                            num_retries=self.num_retries)
153
154         self.work_api = None
155         expected_api = ["jobs", "containers"]
156         for api in expected_api:
157             try:
158                 methods = self.api._rootDesc.get('resources')[api]['methods']
159                 if ('httpMethod' in methods['create'] and
160                     (arvargs.work_api == api or arvargs.work_api is None)):
161                     self.work_api = api
162                     break
163             except KeyError:
164                 pass
165
166         if not self.work_api:
167             if arvargs.work_api is None:
168                 raise Exception("No supported APIs")
169             else:
170                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
171
172         if self.work_api == "jobs":
173             logger.warning("""
174 *******************************
175 Using the deprecated 'jobs' API.
176
177 To get rid of this warning:
178
179 Users: read about migrating at
180 http://doc.arvados.org/user/cwl/cwl-style.html#migrate
181 and use the option --api=containers
182
183 Admins: configure the cluster to disable the 'jobs' API as described at:
184 http://doc.arvados.org/install/install-api-server.html#disable_api_methods
185 *******************************""")
186
187         self.loadingContext = ArvLoadingContext(vars(arvargs))
188         self.loadingContext.fetcher_constructor = self.fetcher_constructor
189         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
190         self.loadingContext.construct_tool_object = self.arv_make_tool
191
192         # Add a custom logging handler to the root logger for runtime status reporting
193         # if running inside a container
194         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
195             root_logger = logging.getLogger('')
196
197             # Remove existing RuntimeStatusLoggingHandlers if they exist
198             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
199             root_logger.handlers = handlers
200
201             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
202             root_logger.addHandler(handler)
203
204         self.runtimeContext = ArvRuntimeContext(vars(arvargs))
205         self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
206                                                      collection_cache=self.collection_cache)
207
208         validate_cluster_target(self, self.runtimeContext)
209
210
211     def arv_make_tool(self, toolpath_object, loadingContext):
212         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
213             return ArvadosCommandTool(self, toolpath_object, loadingContext)
214         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
215             return ArvadosWorkflow(self, toolpath_object, loadingContext)
216         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
217             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
218         else:
219             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
220
221     def output_callback(self, out, processStatus):
222         with self.workflow_eval_lock:
223             if processStatus == "success":
224                 logger.info("Overall process status is %s", processStatus)
225                 state = "Complete"
226             else:
227                 logger.error("Overall process status is %s", processStatus)
228                 state = "Failed"
229             if self.pipeline:
230                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
231                                                         body={"state": state}).execute(num_retries=self.num_retries)
232             self.final_status = processStatus
233             self.final_output = out
234             self.workflow_eval_lock.notifyAll()
235
236
237     def start_run(self, runnable, runtimeContext):
238         self.task_queue.add(partial(runnable.run, runtimeContext),
239                             self.workflow_eval_lock, self.stop_polling)
240
241     def process_submitted(self, container):
242         with self.workflow_eval_lock:
243             self.processes[container.uuid] = container
244
245     def process_done(self, uuid, record):
246         with self.workflow_eval_lock:
247             j = self.processes[uuid]
248             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
249             self.task_queue.add(partial(j.done, record),
250                                 self.workflow_eval_lock, self.stop_polling)
251             del self.processes[uuid]
252
253     def runtime_status_update(self, kind, message, detail=None):
254         """
255         Updates the runtime_status field on the runner container.
256         Called when there's a need to report errors, warnings or just
257         activity statuses, for example in the RuntimeStatusLoggingHandler.
258         """
259         with self.workflow_eval_lock:
260             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
261             if current is None:
262                 return
263             runtime_status = current.get('runtime_status', {})
264             # In case of status being an error, only report the first one.
265             if kind == 'error':
266                 if not runtime_status.get('error'):
267                     runtime_status.update({
268                         'error': message
269                     })
270                     if detail is not None:
271                         runtime_status.update({
272                             'errorDetail': detail
273                         })
274                 # Further errors are only mentioned as a count.
275                 else:
276                     # Get anything before an optional 'and N more' string.
277                     try:
278                         error_msg = re.match(
279                             r'^(.*?)(?=\s*\(and \d+ more\)|$)', runtime_status.get('error')).groups()[0]
280                         more_failures = re.match(
281                             r'.*\(and (\d+) more\)', runtime_status.get('error'))
282                     except TypeError:
283                         # Ignore tests stubbing errors
284                         return
285                     if more_failures:
286                         failure_qty = int(more_failures.groups()[0])
287                         runtime_status.update({
288                             'error': "%s (and %d more)" % (error_msg, failure_qty+1)
289                         })
290                     else:
291                         runtime_status.update({
292                             'error': "%s (and 1 more)" % error_msg
293                         })
294             elif kind in ['warning', 'activity']:
295                 # Record the last warning/activity status without regard of
296                 # previous occurences.
297                 runtime_status.update({
298                     kind: message
299                 })
300                 if detail is not None:
301                     runtime_status.update({
302                         kind+"Detail": detail
303                     })
304             else:
305                 # Ignore any other status kind
306                 return
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
334                 self.process_done(uuid, event["properties"]["new_attributes"])
335
336     def label(self, obj):
337         return "[%s %s]" % (self.work_api[0:-1], obj.name)
338
339     def poll_states(self):
340         """Poll status of jobs or containers listed in the processes dict.
341
342         Runs in a separate thread.
343         """
344
345         try:
346             remain_wait = self.poll_interval
347             while True:
348                 if remain_wait > 0:
349                     self.stop_polling.wait(remain_wait)
350                 if self.stop_polling.is_set():
351                     break
352                 with self.workflow_eval_lock:
353                     keys = list(self.processes)
354                 if not keys:
355                     remain_wait = self.poll_interval
356                     continue
357
358                 begin_poll = time.time()
359                 if self.work_api == "containers":
360                     table = self.poll_api.container_requests()
361                 elif self.work_api == "jobs":
362                     table = self.poll_api.jobs()
363
364                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
365
366                 while keys:
367                     page = keys[:pageSize]
368                     try:
369                         proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
370                     except Exception:
371                         logger.exception("Error checking states on API server: %s")
372                         remain_wait = self.poll_interval
373                         continue
374
375                     for p in proc_states["items"]:
376                         self.on_message({
377                             "object_uuid": p["uuid"],
378                             "event_type": "update",
379                             "properties": {
380                                 "new_attributes": p
381                             }
382                         })
383                     keys = keys[pageSize:]
384
385                 finish_poll = time.time()
386                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
387         except:
388             logger.exception("Fatal error in state polling thread.")
389             with self.workflow_eval_lock:
390                 self.processes.clear()
391                 self.workflow_eval_lock.notifyAll()
392         finally:
393             self.stop_polling.set()
394
395     def add_intermediate_output(self, uuid):
396         if uuid:
397             self.intermediate_output_collections.append(uuid)
398
399     def trash_intermediate_output(self):
400         logger.info("Cleaning up intermediate output collections")
401         for i in self.intermediate_output_collections:
402             try:
403                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
404             except Exception:
405                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
406             except (KeyboardInterrupt, SystemExit):
407                 break
408
409     def check_features(self, obj, parentfield=""):
410         if isinstance(obj, dict):
411             if obj.get("writable") and self.work_api != "containers":
412                 raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
413             if obj.get("class") == "DockerRequirement":
414                 if obj.get("dockerOutputDirectory"):
415                     if self.work_api != "containers":
416                         raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
417                             "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
418                     if not obj.get("dockerOutputDirectory").startswith('/'):
419                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
420                             "Option 'dockerOutputDirectory' must be an absolute path.")
421             if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
422                 raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
423             if obj.get("class") == "InplaceUpdateRequirement":
424                 if obj["inplaceUpdate"] and parentfield == "requirements":
425                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
426             for k,v in viewitems(obj):
427                 self.check_features(v, parentfield=k)
428         elif isinstance(obj, list):
429             for i,v in enumerate(obj):
430                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
431                     self.check_features(v, parentfield=parentfield)
432
433     def make_output_collection(self, name, storage_classes, tagsString, outputObj):
434         outputObj = copy.deepcopy(outputObj)
435
436         files = []
437         def capture(fileobj):
438             files.append(fileobj)
439
440         adjustDirObjs(outputObj, capture)
441         adjustFileObjs(outputObj, capture)
442
443         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
444
445         final = arvados.collection.Collection(api_client=self.api,
446                                               keep_client=self.keep_client,
447                                               num_retries=self.num_retries)
448
449         for k,v in generatemapper.items():
450             if v.type == "Directory" and v.resolved.startswith("_:"):
451                     continue
452             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
453                 with final.open(v.target, "wb") as f:
454                     f.write(v.resolved.encode("utf-8"))
455                     continue
456
457             if not v.resolved.startswith("keep:"):
458                 raise Exception("Output source is not in keep or a literal")
459             sp = v.resolved.split("/")
460             srccollection = sp[0][5:]
461             try:
462                 reader = self.collection_cache.get(srccollection)
463                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
464                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
465             except arvados.errors.ArgumentError as e:
466                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
467                 raise
468             except IOError as e:
469                 logger.error("While preparing output collection: %s", e)
470                 raise
471
472         def rewrite(fileobj):
473             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
474             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
475                 if k in fileobj:
476                     del fileobj[k]
477
478         adjustDirObjs(outputObj, rewrite)
479         adjustFileObjs(outputObj, rewrite)
480
481         with final.open("cwl.output.json", "w") as f:
482             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
483             f.write(res)
484
485         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
486
487         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
488                     final.api_response()["name"],
489                     final.manifest_locator())
490
491         final_uuid = final.manifest_locator()
492         tags = tagsString.split(',')
493         for tag in tags:
494              self.api.links().create(body={
495                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
496                 }).execute(num_retries=self.num_retries)
497
498         def finalcollection(fileobj):
499             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
500
501         adjustDirObjs(outputObj, finalcollection)
502         adjustFileObjs(outputObj, finalcollection)
503
504         return (outputObj, final)
505
506     def set_crunch_output(self):
507         if self.work_api == "containers":
508             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
509             if current is None:
510                 return
511             try:
512                 self.api.containers().update(uuid=current['uuid'],
513                                              body={
514                                                  'output': self.final_output_collection.portable_data_hash(),
515                                              }).execute(num_retries=self.num_retries)
516                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
517                                               body={
518                                                   'is_trashed': True
519                                               }).execute(num_retries=self.num_retries)
520             except Exception:
521                 logger.exception("Setting container output")
522                 return
523         elif self.work_api == "jobs" and "TASK_UUID" in os.environ:
524             self.api.job_tasks().update(uuid=os.environ["TASK_UUID"],
525                                    body={
526                                        'output': self.final_output_collection.portable_data_hash(),
527                                        'success': self.final_status == "success",
528                                        'progress':1.0
529                                    }).execute(num_retries=self.num_retries)
530
531     def apply_reqs(self, job_order_object, tool):
532         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
533             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
534                 raise WorkflowException(
535                     "`cwl:requirements` in the input object is not part of CWL "
536                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
537                     "can set the cwlVersion to v1.1.0-dev1 or greater and re-run with "
538                     "--enable-dev.")
539             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
540             for req in job_reqs:
541                 tool.requirements.append(req)
542
543     def arv_executor(self, tool, job_order, runtimeContext, logger=None):
544         self.debug = runtimeContext.debug
545
546         tool.visit(self.check_features)
547
548         self.project_uuid = runtimeContext.project_uuid
549         self.pipeline = None
550         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
551         self.secret_store = runtimeContext.secret_store
552
553         self.trash_intermediate = runtimeContext.trash_intermediate
554         if self.trash_intermediate and self.work_api != "containers":
555             raise Exception("--trash-intermediate is only supported with --api=containers.")
556
557         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
558         if self.intermediate_output_ttl and self.work_api != "containers":
559             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
560         if self.intermediate_output_ttl < 0:
561             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
562
563         if runtimeContext.submit_request_uuid and self.work_api != "containers":
564             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
565
566         if not runtimeContext.name:
567             runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
568
569         # Upload local file references in the job order.
570         job_order = upload_job_order(self, "%s input" % runtimeContext.name,
571                                      tool, job_order)
572
573         submitting = (runtimeContext.update_workflow or
574                       runtimeContext.create_workflow or
575                       (runtimeContext.submit and not
576                        (tool.tool["class"] == "CommandLineTool" and
577                         runtimeContext.wait and
578                         not runtimeContext.always_submit_runner)))
579
580         loadingContext = self.loadingContext.copy()
581         loadingContext.do_validate = False
582         loadingContext.do_update = False
583         if submitting:
584             # Document may have been auto-updated. Reload the original
585             # document with updating disabled because we want to
586             # submit the original document, not the auto-updated one.
587             tool = load_tool(tool.tool["id"], loadingContext)
588
589         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
590         # Also uploads docker images.
591         merged_map = upload_workflow_deps(self, tool)
592
593         # Recreate process object (ArvadosWorkflow or
594         # ArvadosCommandTool) because tool document may have been
595         # updated by upload_workflow_deps in ways that modify
596         # inheritance of hints or requirements.
597         loadingContext.loader = tool.doc_loader
598         loadingContext.avsc_names = tool.doc_schema
599         loadingContext.metadata = tool.metadata
600         tool = load_tool(tool.tool, loadingContext)
601
602         existing_uuid = runtimeContext.update_workflow
603         if existing_uuid or runtimeContext.create_workflow:
604             # Create a pipeline template or workflow record and exit.
605             if self.work_api == "jobs":
606                 tmpl = RunnerTemplate(self, tool, job_order,
607                                       runtimeContext.enable_reuse,
608                                       uuid=existing_uuid,
609                                       submit_runner_ram=runtimeContext.submit_runner_ram,
610                                       name=runtimeContext.name,
611                                       merged_map=merged_map,
612                                       loadingContext=loadingContext)
613                 tmpl.save()
614                 # cwltool.main will write our return value to stdout.
615                 return (tmpl.uuid, "success")
616             elif self.work_api == "containers":
617                 return (upload_workflow(self, tool, job_order,
618                                         self.project_uuid,
619                                         uuid=existing_uuid,
620                                         submit_runner_ram=runtimeContext.submit_runner_ram,
621                                         name=runtimeContext.name,
622                                         merged_map=merged_map),
623                         "success")
624
625         self.apply_reqs(job_order, tool)
626
627         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
628         self.eval_timeout = runtimeContext.eval_timeout
629
630         runtimeContext = runtimeContext.copy()
631         runtimeContext.use_container = True
632         runtimeContext.tmpdir_prefix = "tmp"
633         runtimeContext.work_api = self.work_api
634
635         if self.work_api == "containers":
636             if self.ignore_docker_for_reuse:
637                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
638             runtimeContext.outdir = "/var/spool/cwl"
639             runtimeContext.docker_outdir = "/var/spool/cwl"
640             runtimeContext.tmpdir = "/tmp"
641             runtimeContext.docker_tmpdir = "/tmp"
642         elif self.work_api == "jobs":
643             if runtimeContext.priority != DEFAULT_PRIORITY:
644                 raise Exception("--priority not implemented for jobs API.")
645             runtimeContext.outdir = "$(task.outdir)"
646             runtimeContext.docker_outdir = "$(task.outdir)"
647             runtimeContext.tmpdir = "$(task.tmpdir)"
648
649         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
650             raise Exception("--priority must be in the range 1..1000.")
651
652         if self.should_estimate_cache_size:
653             visited = set()
654             estimated_size = [0]
655             def estimate_collection_cache(obj):
656                 if obj.get("location", "").startswith("keep:"):
657                     m = pdh_size.match(obj["location"][5:])
658                     if m and m.group(1) not in visited:
659                         visited.add(m.group(1))
660                         estimated_size[0] += int(m.group(2))
661             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
662             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
663             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
664
665         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
666
667         runnerjob = None
668         if runtimeContext.submit:
669             # Submit a runner job to run the workflow for us.
670             if self.work_api == "containers":
671                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
672                     runtimeContext.runnerjob = tool.tool["id"]
673                 else:
674                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
675                                                 self.output_name,
676                                                 self.output_tags,
677                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
678                                                 name=runtimeContext.name,
679                                                 on_error=runtimeContext.on_error,
680                                                 submit_runner_image=runtimeContext.submit_runner_image,
681                                                 intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
682                                                 merged_map=merged_map,
683                                                 priority=runtimeContext.priority,
684                                                 secret_store=self.secret_store,
685                                                 collection_cache_size=runtimeContext.collection_cache_size,
686                                                 collection_cache_is_default=self.should_estimate_cache_size)
687             elif self.work_api == "jobs":
688                 tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
689                                       self.output_name,
690                                       self.output_tags,
691                                       submit_runner_ram=runtimeContext.submit_runner_ram,
692                                       name=runtimeContext.name,
693                                       on_error=runtimeContext.on_error,
694                                       submit_runner_image=runtimeContext.submit_runner_image,
695                                       merged_map=merged_map)
696         elif runtimeContext.cwl_runner_job is None and self.work_api == "jobs":
697             # Create pipeline for local run
698             self.pipeline = self.api.pipeline_instances().create(
699                 body={
700                     "owner_uuid": self.project_uuid,
701                     "name": runtimeContext.name if runtimeContext.name else shortname(tool.tool["id"]),
702                     "components": {},
703                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
704             logger.info("Pipeline instance %s", self.pipeline["uuid"])
705
706         if runtimeContext.cwl_runner_job is not None:
707             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
708
709         jobiter = tool.job(job_order,
710                            self.output_callback,
711                            runtimeContext)
712
713         if runtimeContext.submit and not runtimeContext.wait:
714             runnerjob = next(jobiter)
715             runnerjob.run(runtimeContext)
716             return (runnerjob.uuid, "success")
717
718         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
719         if current_container:
720             logger.info("Running inside container %s", current_container.get("uuid"))
721
722         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
723         self.polling_thread = threading.Thread(target=self.poll_states)
724         self.polling_thread.start()
725
726         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
727
728         try:
729             self.workflow_eval_lock.acquire()
730
731             # Holds the lock while this code runs and releases it when
732             # it is safe to do so in self.workflow_eval_lock.wait(),
733             # at which point on_message can update job state and
734             # process output callbacks.
735
736             loopperf = Perf(metrics, "jobiter")
737             loopperf.__enter__()
738             for runnable in jobiter:
739                 loopperf.__exit__()
740
741                 if self.stop_polling.is_set():
742                     break
743
744                 if self.task_queue.error is not None:
745                     raise self.task_queue.error
746
747                 if runnable:
748                     with Perf(metrics, "run"):
749                         self.start_run(runnable, runtimeContext)
750                 else:
751                     if (self.task_queue.in_flight + len(self.processes)) > 0:
752                         self.workflow_eval_lock.wait(3)
753                     else:
754                         logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
755                         break
756
757                 if self.stop_polling.is_set():
758                     break
759
760                 loopperf.__enter__()
761             loopperf.__exit__()
762
763             while (self.task_queue.in_flight + len(self.processes)) > 0:
764                 if self.task_queue.error is not None:
765                     raise self.task_queue.error
766                 self.workflow_eval_lock.wait(3)
767
768         except UnsupportedRequirement:
769             raise
770         except:
771             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
772                 logger.error("Interrupted, workflow will be cancelled")
773             elif isinstance(sys.exc_info()[1], WorkflowException):
774                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
775             else:
776                 logger.exception("Workflow execution failed")
777
778             if self.pipeline:
779                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
780                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
781             if runtimeContext.submit and isinstance(tool, Runner):
782                 runnerjob = tool
783                 if runnerjob.uuid and self.work_api == "containers":
784                     self.api.container_requests().update(uuid=runnerjob.uuid,
785                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
786         finally:
787             self.workflow_eval_lock.release()
788             self.task_queue.drain()
789             self.stop_polling.set()
790             self.polling_thread.join()
791             self.task_queue.join()
792
793         if self.final_status == "UnsupportedRequirement":
794             raise UnsupportedRequirement("Check log for details.")
795
796         if self.final_output is None:
797             raise WorkflowException("Workflow did not return a result.")
798
799         if runtimeContext.submit and isinstance(tool, Runner):
800             logger.info("Final output collection %s", tool.final_output)
801         else:
802             if self.output_name is None:
803                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
804             if self.output_tags is None:
805                 self.output_tags = ""
806
807             storage_classes = runtimeContext.storage_classes.strip().split(",")
808             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
809             self.set_crunch_output()
810
811         if runtimeContext.compute_checksum:
812             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
813             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
814
815         if self.trash_intermediate and self.final_status == "success":
816             self.trash_intermediate_output()
817
818         return (self.final_output, self.final_status)