21943: Fix bug and add integration test
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import subprocess
15 import time
16 import urllib
17
18 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 from schema_salad.sourceline import SourceLine, cmap
21 import schema_salad.validate as validate
22 from schema_salad.ref_resolver import file_uri, uri_file_path
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 import arvados_cwl.util
30 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
31 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps, ArvSecretStore
32 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from cwltool.task_queue import TaskQueue
38 from .context import ArvLoadingContext, ArvRuntimeContext
39 from ._version import __version__
40
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
43 from cwltool.command_line_tool import compute_checksums
44 from cwltool.load_tool import load_tool
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 DEFAULT_PRIORITY = 500
50
51 class RuntimeStatusLoggingHandler(logging.Handler):
52     """
53     Intercepts logging calls and report them as runtime statuses on runner
54     containers.
55     """
56     def __init__(self, runtime_status_update_func):
57         super(RuntimeStatusLoggingHandler, self).__init__()
58         self.runtime_status_update = runtime_status_update_func
59         self.updatingRuntimeStatus = False
60
61     def emit(self, record):
62         kind = None
63         if record.levelno >= logging.ERROR:
64             kind = 'error'
65         elif record.levelno >= logging.WARNING:
66             kind = 'warning'
67         if kind == 'warning' and record.name in ("salad", "crunchstat_summary"):
68             # Don't send validation warnings to runtime status,
69             # they're noisy and unhelpful.
70             return
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113             arvargs.git_info = True
114             arvargs.submit = False
115             arvargs.defer_downloads = False
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143         self.debug = False
144
145         if keep_client is not None:
146             self.keep_client = keep_client
147         else:
148             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
149
150         if arvargs.collection_cache_size:
151             collection_cache_size = arvargs.collection_cache_size*1024*1024
152             self.should_estimate_cache_size = False
153         else:
154             collection_cache_size = 256*1024*1024
155
156         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
157                                                 cap=collection_cache_size)
158
159         self.fetcher_constructor = partial(CollectionFetcher,
160                                            api_client=self.api,
161                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
162                                            num_retries=self.num_retries)
163
164         self.work_api = None
165         expected_api = ["containers"]
166         for api in expected_api:
167             try:
168                 methods = self.api._rootDesc.get('resources')[api]['methods']
169                 if ('httpMethod' in methods['create'] and
170                     (arvargs.work_api == api or arvargs.work_api is None)):
171                     self.work_api = api
172                     break
173             except KeyError:
174                 pass
175
176         if not self.work_api:
177             if arvargs.work_api is None:
178                 raise Exception("No supported APIs")
179             else:
180                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
181
182         if self.work_api == "jobs":
183             logger.error("""
184 *******************************
185 The 'jobs' API is no longer supported.
186 *******************************""")
187             exit(1)
188
189         self.loadingContext = ArvLoadingContext(vars(arvargs))
190         self.loadingContext.fetcher_constructor = self.fetcher_constructor
191         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
192         self.loadingContext.construct_tool_object = self.arv_make_tool
193
194         # Add a custom logging handler to the root logger for runtime status reporting
195         # if running inside a container
196         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
197             root_logger = logging.getLogger('')
198
199             # Remove existing RuntimeStatusLoggingHandlers if they exist
200             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
201             root_logger.handlers = handlers
202
203             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
204             root_logger.addHandler(handler)
205
206         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
207         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
208                                                      collection_cache=self.collection_cache)
209         self.toplevel_runtimeContext.secret_store = ArvSecretStore()
210
211         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
212
213         validate_cluster_target(self, self.toplevel_runtimeContext)
214
215
216     def arv_make_tool(self, toolpath_object, loadingContext):
217         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
218             return ArvadosCommandTool(self, toolpath_object, loadingContext)
219         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
220             return ArvadosWorkflow(self, toolpath_object, loadingContext)
221         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
222             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
223         else:
224             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
225
226     def output_callback(self, out, processStatus):
227         with self.workflow_eval_lock:
228             if processStatus == "success":
229                 logger.info("Overall process status is %s", processStatus)
230                 state = "Complete"
231             else:
232                 logger.error("Overall process status is %s", processStatus)
233                 state = "Failed"
234             if self.pipeline:
235                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
236                                                         body={"state": state}).execute(num_retries=self.num_retries)
237             self.final_status = processStatus
238             self.final_output = out
239             self.workflow_eval_lock.notifyAll()
240
241
242     def start_run(self, runnable, runtimeContext):
243         self.task_queue.add(partial(runnable.run, runtimeContext),
244                             self.workflow_eval_lock, self.stop_polling)
245
246     def process_submitted(self, container):
247         with self.workflow_eval_lock:
248             self.processes[container.uuid] = container
249
250     def process_done(self, uuid, record):
251         with self.workflow_eval_lock:
252             j = self.processes[uuid]
253             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
254             self.task_queue.add(partial(j.done, record),
255                                 self.workflow_eval_lock, self.stop_polling)
256             del self.processes[uuid]
257
258     def runtime_status_update(self, kind, message, detail=None):
259         """
260         Updates the runtime_status field on the runner container.
261         Called when there's a need to report errors, warnings or just
262         activity statuses, for example in the RuntimeStatusLoggingHandler.
263         """
264
265         if kind not in ('error', 'warning', 'activity'):
266             # Ignore any other status kind
267             return
268
269         with self.workflow_eval_lock:
270             current = None
271             try:
272                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
273             except Exception as e:
274                 logger.info("Couldn't get current container: %s", e)
275             if current is None:
276                 return
277             runtime_status = current.get('runtime_status', {})
278
279             original_updatemessage = updatemessage = runtime_status.get(kind, "")
280             if kind == "activity" or not updatemessage:
281                 updatemessage = message
282
283             # Subsequent messages tacked on in detail
284             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
285             maxlines = 40
286             if updatedetail.count("\n") < maxlines:
287                 if updatedetail:
288                     updatedetail += "\n"
289                 updatedetail += message + "\n"
290
291                 if detail:
292                     updatedetail += detail + "\n"
293
294                 if updatedetail.count("\n") >= maxlines:
295                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
296
297             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
298                 # don't waste time doing an update if nothing changed
299                 # (usually because we exceeded the max lines)
300                 return
301
302             runtime_status.update({
303                 kind: updatemessage,
304                 kind+'Detail': updatedetail,
305             })
306
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
334                 self.process_done(uuid, event["properties"]["new_attributes"])
335
336     def label(self, obj):
337         return "[%s %s]" % (self.work_api[0:-1], obj.name)
338
339     def poll_states(self):
340         """Poll status of containers listed in the processes dict.
341
342         Runs in a separate thread.
343         """
344
345         try:
346             remain_wait = self.poll_interval
347             while True:
348                 if remain_wait > 0:
349                     self.stop_polling.wait(remain_wait)
350                 if self.stop_polling.is_set():
351                     break
352                 with self.workflow_eval_lock:
353                     keys = list(self.processes)
354                 if not keys:
355                     remain_wait = self.poll_interval
356                     continue
357
358                 begin_poll = time.time()
359                 if self.work_api == "containers":
360                     table = self.poll_api.container_requests()
361
362                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
363
364                 while keys:
365                     page = keys[:pageSize]
366                     try:
367                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
368                                                                                          "output_uuid", "modified_at", "properties",
369                                                                                          "runtime_constraints"]).execute(num_retries=self.num_retries)
370                     except Exception as e:
371                         logger.warning("Temporary error checking states on API server: %s", e)
372                         remain_wait = self.poll_interval
373                         continue
374
375                     for p in proc_states["items"]:
376                         self.on_message({
377                             "object_uuid": p["uuid"],
378                             "event_type": "update",
379                             "properties": {
380                                 "new_attributes": p
381                             }
382                         })
383                     keys = keys[pageSize:]
384
385                 finish_poll = time.time()
386                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
387         except:
388             logger.exception("Fatal error in state polling thread.")
389             with self.workflow_eval_lock:
390                 self.processes.clear()
391                 self.workflow_eval_lock.notifyAll()
392         finally:
393             self.stop_polling.set()
394
395     def add_intermediate_output(self, uuid):
396         if uuid:
397             self.intermediate_output_collections.append(uuid)
398
399     def trash_intermediate_output(self):
400         logger.info("Cleaning up intermediate output collections")
401         for i in self.intermediate_output_collections:
402             try:
403                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
404             except Exception:
405                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
406             except (KeyboardInterrupt, SystemExit):
407                 break
408
409     def check_features(self, obj, parentfield=""):
410         if isinstance(obj, dict):
411             if obj.get("class") == "DockerRequirement":
412                 if obj.get("dockerOutputDirectory"):
413                     if not obj.get("dockerOutputDirectory").startswith('/'):
414                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
415                             "Option 'dockerOutputDirectory' must be an absolute path.")
416             if obj.get("class") == "InplaceUpdateRequirement":
417                 if obj["inplaceUpdate"] and parentfield == "requirements":
418                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
419             for k,v in obj.items():
420                 self.check_features(v, parentfield=k)
421         elif isinstance(obj, list):
422             for i,v in enumerate(obj):
423                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
424                     self.check_features(v, parentfield=parentfield)
425
426     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
427         outputObj = copy.deepcopy(outputObj)
428
429         files = []
430         def captureFile(fileobj):
431             files.append(fileobj)
432
433         def captureDir(dirobj):
434             if dirobj["location"].startswith("keep:") and 'listing' in dirobj:
435                 del dirobj['listing']
436             files.append(dirobj)
437
438         adjustDirObjs(outputObj, captureDir)
439         adjustFileObjs(outputObj, captureFile)
440
441         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
442
443         final = arvados.collection.Collection(api_client=self.api,
444                                               keep_client=self.keep_client,
445                                               num_retries=self.num_retries)
446
447         for k,v in generatemapper.items():
448             if v.type == "Directory" and v.resolved.startswith("_:"):
449                     continue
450             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
451                 with final.open(v.target, "wb") as f:
452                     f.write(v.resolved.encode("utf-8"))
453                     continue
454
455             if not v.resolved.startswith("keep:"):
456                 raise Exception("Output source is not in keep or a literal")
457             sp = v.resolved.split("/")
458             srccollection = sp[0][5:]
459             try:
460                 reader = self.collection_cache.get(srccollection)
461                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
462                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
463             except arvados.errors.ArgumentError as e:
464                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
465                 raise
466             except IOError as e:
467                 logger.error("While preparing output collection: %s", e)
468                 raise
469
470         def rewrite(fileobj):
471             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
472             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
473                 if k in fileobj:
474                     del fileobj[k]
475
476         adjustDirObjs(outputObj, rewrite)
477         adjustFileObjs(outputObj, rewrite)
478
479         with final.open("cwl.output.json", "w") as f:
480             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
481             f.write(res)
482
483
484         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
485                        ensure_unique_name=True, properties=output_properties)
486
487         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
488                     final.api_response()["name"],
489                     final.manifest_locator())
490
491         final_uuid = final.manifest_locator()
492         tags = tagsString.split(',')
493         for tag in tags:
494              self.api.links().create(body={
495                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
496                 }).execute(num_retries=self.num_retries)
497
498         def finalcollection(fileobj):
499             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
500
501         adjustDirObjs(outputObj, finalcollection)
502         adjustFileObjs(outputObj, finalcollection)
503
504         return (outputObj, final)
505
506     def set_crunch_output(self):
507         if self.work_api == "containers":
508             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
509             if current is None:
510                 return
511             try:
512                 self.api.containers().update(uuid=current['uuid'],
513                                              body={
514                                                  'output': self.final_output_collection.portable_data_hash(),
515                                                  'output_properties': self.final_output_collection.get_properties(),
516                                              }).execute(num_retries=self.num_retries)
517                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
518                                               body={
519                                                   'is_trashed': True
520                                               }).execute(num_retries=self.num_retries)
521             except Exception:
522                 logger.exception("Setting container output")
523                 raise
524
525     def apply_reqs(self, job_order_object, tool):
526         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
527             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
528                 raise WorkflowException(
529                     "`cwl:requirements` in the input object is not part of CWL "
530                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
531                     "can set the cwlVersion to v1.1 or greater and re-run with "
532                     "--enable-dev.")
533             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
534             for req in job_reqs:
535                 tool.requirements.append(req)
536
537     @staticmethod
538     def get_git_info(tool):
539         in_a_git_repo = False
540         cwd = None
541         filepath = None
542
543         if tool.tool["id"].startswith("file://"):
544             # check if git is installed
545             try:
546                 filepath = uri_file_path(tool.tool["id"])
547                 cwd = os.path.dirname(filepath)
548                 subprocess.run(
549                     ["git", "log", "--format=%H", "-n1", "HEAD"],
550                     cwd=cwd,
551                     check=True,
552                     stdout=subprocess.DEVNULL,
553                 )
554                 in_a_git_repo = True
555             except Exception as e:
556                 pass
557
558         gitproperties = {}
559
560         if in_a_git_repo:
561             def git_output(cmd):
562                 return subprocess.run(
563                     cmd,
564                     cwd=cwd,
565                     stdout=subprocess.PIPE,
566                     universal_newlines=True,
567                 ).stdout.strip()
568             git_commit = git_output(["git", "log", "--format=%H", "-n1", "HEAD"])
569             git_date = git_output(["git", "log", "--format=%cD", "-n1", "HEAD"])
570             git_committer = git_output(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"])
571             git_branch = git_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
572             git_origin = git_output(["git", "remote", "get-url", "origin"])
573             git_status = git_output(["git", "status", "--untracked-files=no", "--porcelain"])
574             git_describe = git_output(["git", "describe", "--always", "--tags"])
575             git_toplevel = git_output(["git", "rev-parse", "--show-toplevel"])
576             git_path = filepath[len(git_toplevel):]
577
578             gitproperties = {
579                 "http://arvados.org/cwl#gitCommit": git_commit,
580                 "http://arvados.org/cwl#gitDate": git_date,
581                 "http://arvados.org/cwl#gitCommitter": git_committer,
582                 "http://arvados.org/cwl#gitBranch": git_branch,
583                 "http://arvados.org/cwl#gitOrigin": git_origin,
584                 "http://arvados.org/cwl#gitStatus": git_status,
585                 "http://arvados.org/cwl#gitDescribe": git_describe,
586                 "http://arvados.org/cwl#gitPath": git_path,
587             }
588         else:
589             for g in ("http://arvados.org/cwl#gitCommit",
590                       "http://arvados.org/cwl#gitDate",
591                       "http://arvados.org/cwl#gitCommitter",
592                       "http://arvados.org/cwl#gitBranch",
593                       "http://arvados.org/cwl#gitOrigin",
594                       "http://arvados.org/cwl#gitStatus",
595                       "http://arvados.org/cwl#gitDescribe",
596                       "http://arvados.org/cwl#gitPath"):
597                 if g in tool.metadata:
598                     gitproperties[g] = tool.metadata[g]
599
600         return gitproperties
601
602     def set_container_request_properties(self, container, properties):
603         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
604         for cr in resp["items"]:
605             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
606             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
607
608     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
609         self.debug = runtimeContext.debug
610
611         self.runtime_status_update("activity", "initialization")
612
613         git_info = self.get_git_info(updated_tool) if self.git_info else {}
614         if git_info:
615             logger.info("Git provenance")
616             for g in git_info:
617                 if git_info[g]:
618                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
619
620         runtimeContext.git_info = git_info
621
622         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
623         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
624         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
625         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
626
627         if not self.fast_submit:
628             updated_tool.visit(self.check_features)
629
630         self.pipeline = None
631         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
632         self.secret_store = runtimeContext.secret_store
633
634         self.trash_intermediate = runtimeContext.trash_intermediate
635         if self.trash_intermediate and self.work_api != "containers":
636             raise Exception("--trash-intermediate is only supported with --api=containers.")
637
638         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
639         if self.intermediate_output_ttl and self.work_api != "containers":
640             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
641         if self.intermediate_output_ttl < 0:
642             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
643
644         if runtimeContext.submit_request_uuid and self.work_api != "containers":
645             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
646
647         runtimeContext = runtimeContext.copy()
648
649         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
650         if runtimeContext.storage_classes == "default":
651             runtimeContext.storage_classes = default_storage_classes
652         if runtimeContext.intermediate_storage_classes == "default":
653             runtimeContext.intermediate_storage_classes = default_storage_classes
654
655         if not runtimeContext.name:
656             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
657             if git_info.get("http://arvados.org/cwl#gitDescribe"):
658                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
659             runtimeContext.name = self.name
660
661         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
662             # When creating or updating workflow record, by default
663             # always copy dependencies and ensure Docker images are up
664             # to date.
665             runtimeContext.copy_deps = True
666             runtimeContext.match_local_docker = True
667
668         if runtimeContext.print_keep_deps:
669             runtimeContext.copy_deps = False
670             runtimeContext.match_local_docker = False
671
672         if runtimeContext.update_workflow and self.project_uuid is None:
673             # If we are updating a workflow, make sure anything that
674             # gets uploaded goes into the same parent project, unless
675             # an alternate --project-uuid was provided.
676             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
677             runtimeContext.project_uuid = existing_wf["owner_uuid"]
678
679         self.project_uuid = runtimeContext.project_uuid
680
681         self.runtime_status_update("activity", "data transfer")
682
683         # Upload local file references in the job order.
684         with Perf(metrics, "upload_job_order"):
685             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
686                                          updated_tool, job_order, runtimeContext)
687
688         # determine if we are submitting or directly executing the workflow.
689         #
690         # the last clause means: if it is a command line tool, and we
691         # are going to wait for the result, and always_submit_runner
692         # is false, then we don't submit a runner process.
693
694         submitting = (runtimeContext.submit and not
695                        (updated_tool.tool["class"] == "CommandLineTool" and
696                         runtimeContext.wait and
697                         not runtimeContext.always_submit_runner))
698
699         loadingContext = self.loadingContext.copy()
700         loadingContext.do_validate = False
701         loadingContext.disable_js_validation = True
702         tool = updated_tool
703
704         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
705         # Also uploads docker images.
706         if not self.fast_submit:
707             logger.info("Uploading workflow dependencies")
708             with Perf(metrics, "upload_workflow_deps"):
709                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
710         else:
711             # in the fast submit case, we are running a workflow that
712             # has already been uploaded to Arvados, so we assume all
713             # the dependencies have been pinned to keep references and
714             # there is nothing to do.
715             merged_map = {}
716
717         loadingContext.loader = tool.doc_loader
718         loadingContext.avsc_names = tool.doc_schema
719         loadingContext.metadata = tool.metadata
720         loadingContext.skip_resolve_all = True
721
722         workflow_wrapper = None
723         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
724             # upload workflow and get back the workflow wrapper
725
726             workflow_wrapper = upload_workflow(self, tool, job_order,
727                                                runtimeContext.project_uuid,
728                                                runtimeContext,
729                                                uuid=runtimeContext.update_workflow,
730                                                submit_runner_ram=runtimeContext.submit_runner_ram,
731                                                name=runtimeContext.name,
732                                                merged_map=merged_map,
733                                                submit_runner_image=runtimeContext.submit_runner_image,
734                                                git_info=git_info,
735                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
736                                                jobmapper=jobmapper)
737
738             if runtimeContext.update_workflow or runtimeContext.create_workflow:
739                 # We're registering the workflow, so create or update
740                 # the workflow record and then exit.
741                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
742                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
743                 self.stdout.write(uuid + "\n")
744                 return (None, "success")
745
746             if runtimeContext.print_keep_deps:
747                 # Just find and print out all the collection dependencies and exit
748                 print_keep_deps(self, runtimeContext, merged_map, tool)
749                 return (None, "success")
750
751             # Did not register a workflow, we're going to submit
752             # it instead.
753             loadingContext.loader.idx.clear()
754             loadingContext.loader.idx["_:main"] = workflow_wrapper
755             workflow_wrapper["id"] = "_:main"
756
757             # Reload the minimal wrapper workflow.
758             self.fast_submit = True
759             tool = load_tool(workflow_wrapper, loadingContext)
760             loadingContext.loader.idx["_:main"] = workflow_wrapper
761
762         if not submitting:
763             # If we are going to run the workflow now (rather than
764             # submit it), we need to update the workflow document
765             # replacing file references with keep references.  If we
766             # are just going to construct a run submission, we don't
767             # need to do this.
768             update_from_merged_map(tool, merged_map)
769
770         self.apply_reqs(job_order, tool)
771
772         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
773         self.eval_timeout = runtimeContext.eval_timeout
774
775         runtimeContext.use_container = True
776         runtimeContext.tmpdir_prefix = "tmp"
777         runtimeContext.work_api = self.work_api
778
779         if not self.output_name:
780              self.output_name = "Output from workflow %s" % runtimeContext.name
781
782         self.output_name  = cleanup_name_for_collection(self.output_name)
783
784         if self.work_api == "containers":
785             if self.ignore_docker_for_reuse:
786                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
787             runtimeContext.outdir = "/var/spool/cwl"
788             runtimeContext.docker_outdir = "/var/spool/cwl"
789             runtimeContext.tmpdir = "/tmp"
790             runtimeContext.docker_tmpdir = "/tmp"
791
792         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
793             raise Exception("--priority must be in the range 1..1000.")
794
795         if self.should_estimate_cache_size:
796             visited = set()
797             estimated_size = [0]
798             def estimate_collection_cache(obj):
799                 if obj.get("location", "").startswith("keep:"):
800                     m = pdh_size.match(obj["location"][5:])
801                     if m and m.group(1) not in visited:
802                         visited.add(m.group(1))
803                         estimated_size[0] += int(m.group(2))
804             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
805             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
806             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
807
808         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
809
810         runnerjob = None
811         if runtimeContext.submit:
812             # We are submitting instead of running immediately.
813             #
814             # Create a "Runner job" that when run() is invoked,
815             # creates the container request to run the workflow.
816             if self.work_api == "containers":
817                 if submitting:
818                     loadingContext.metadata = updated_tool.metadata.copy()
819                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
820                                            self.output_name,
821                                            self.output_tags,
822                                            submit_runner_ram=runtimeContext.submit_runner_ram,
823                                            name=runtimeContext.name,
824                                            on_error=runtimeContext.on_error,
825                                            submit_runner_image=runtimeContext.submit_runner_image,
826                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
827                                            merged_map=merged_map,
828                                            priority=runtimeContext.priority,
829                                            secret_store=self.secret_store,
830                                            collection_cache_size=runtimeContext.collection_cache_size,
831                                            collection_cache_is_default=self.should_estimate_cache_size,
832                                            git_info=git_info)
833                 else:
834                     runtimeContext.runnerjob = tool.tool["id"]
835
836         if runtimeContext.cwl_runner_job is not None:
837             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
838
839         jobiter = tool.job(job_order,
840                            self.output_callback,
841                            runtimeContext)
842
843         if runtimeContext.submit and not runtimeContext.wait:
844             # User provided --no-wait so submit the container request,
845             # get the container request uuid, print it out, and exit.
846             runnerjob = next(jobiter)
847             runnerjob.run(runtimeContext)
848             self.stdout.write(runnerjob.uuid+"\n")
849             return (None, "success")
850
851         # We either running the workflow directly, or submitting it
852         # and will wait for a final result.
853
854         self.runtime_status_update("activity", "workflow execution")
855
856         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
857         if current_container:
858             logger.info("Running inside container %s", current_container.get("uuid"))
859             self.set_container_request_properties(current_container, git_info)
860
861         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
862         self.polling_thread = threading.Thread(target=self.poll_states)
863         self.polling_thread.start()
864
865         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
866
867         try:
868             self.workflow_eval_lock.acquire()
869
870             # Holds the lock while this code runs and releases it when
871             # it is safe to do so in self.workflow_eval_lock.wait(),
872             # at which point on_message can update job state and
873             # process output callbacks.
874
875             loopperf = Perf(metrics, "jobiter")
876             loopperf.__enter__()
877             for runnable in jobiter:
878                 loopperf.__exit__()
879
880                 if self.stop_polling.is_set():
881                     break
882
883                 if self.task_queue.error is not None:
884                     raise self.task_queue.error
885
886                 if runnable:
887                     with Perf(metrics, "run"):
888                         self.start_run(runnable, runtimeContext)
889                 else:
890                     if (self.task_queue.in_flight + len(self.processes)) > 0:
891                         self.workflow_eval_lock.wait(3)
892                     else:
893                         if self.final_status is None:
894                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
895                         break
896
897                 if self.stop_polling.is_set():
898                     break
899
900                 loopperf.__enter__()
901             loopperf.__exit__()
902
903             while (self.task_queue.in_flight + len(self.processes)) > 0:
904                 if self.task_queue.error is not None:
905                     raise self.task_queue.error
906                 self.workflow_eval_lock.wait(3)
907
908         except UnsupportedRequirement:
909             raise
910         except:
911             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
912                 logger.error("Interrupted, workflow will be cancelled")
913             elif isinstance(sys.exc_info()[1], WorkflowException):
914                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
915             else:
916                 logger.exception("Workflow execution failed")
917
918             if self.pipeline:
919                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
920                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
921
922             if self.work_api == "containers" and not current_container:
923                 # Not running in a crunch container, so cancel any outstanding processes.
924                 for p in self.processes:
925                     try:
926                         self.api.container_requests().update(uuid=p,
927                                                              body={"priority": "0"}
928                         ).execute(num_retries=self.num_retries)
929                     except Exception:
930                         pass
931         finally:
932             self.workflow_eval_lock.release()
933             self.task_queue.drain()
934             self.stop_polling.set()
935             self.polling_thread.join()
936             self.task_queue.join()
937
938         if self.final_status == "UnsupportedRequirement":
939             raise UnsupportedRequirement("Check log for details.")
940
941         if self.final_output is None:
942             raise WorkflowException("Workflow did not return a result.")
943
944         if runtimeContext.usage_report_notes:
945             logger.info("Steps with low resource utilization (possible optimization opportunities):")
946             for x in runtimeContext.usage_report_notes:
947                 logger.info("  %s", x)
948
949         if runtimeContext.submit and isinstance(tool, Runner):
950             logger.info("Final output collection %s", tool.final_output)
951             if workbench2 or workbench1:
952                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
953         else:
954             if self.output_tags is None:
955                 self.output_tags = ""
956
957             storage_classes = ""
958             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
959             if storage_class_req and storage_class_req.get("finalStorageClass"):
960                 storage_classes = aslist(storage_class_req["finalStorageClass"])
961             else:
962                 storage_classes = runtimeContext.storage_classes.strip().split(",")
963
964             output_properties = {}
965             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
966             if output_properties_req:
967                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
968                 for pr in output_properties_req["outputProperties"]:
969                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
970
971             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
972                                                                                           self.output_tags, output_properties,
973                                                                                           self.final_output)
974             self.set_crunch_output()
975
976         if runtimeContext.compute_checksum:
977             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
978             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
979
980         if self.trash_intermediate and self.final_status == "success":
981             self.trash_intermediate_output()
982
983         return (self.final_output, self.final_status)
984
985 def blank_secrets(job_order_object, process):
986     secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
987     pass