Merge branch '21934-python36-compat'
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import subprocess
15 import time
16 import urllib
17
18 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 from schema_salad.sourceline import SourceLine, cmap
21 import schema_salad.validate as validate
22 from schema_salad.ref_resolver import file_uri, uri_file_path
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 import arvados_cwl.util
30 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
31 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps, ArvSecretStore
32 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from cwltool.task_queue import TaskQueue
38 from .context import ArvLoadingContext, ArvRuntimeContext
39 from ._version import __version__
40
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
43 from cwltool.command_line_tool import compute_checksums
44 from cwltool.load_tool import load_tool
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 DEFAULT_PRIORITY = 500
50
51 class RuntimeStatusLoggingHandler(logging.Handler):
52     """
53     Intercepts logging calls and report them as runtime statuses on runner
54     containers.
55     """
56     def __init__(self, runtime_status_update_func):
57         super(RuntimeStatusLoggingHandler, self).__init__()
58         self.runtime_status_update = runtime_status_update_func
59         self.updatingRuntimeStatus = False
60
61     def emit(self, record):
62         kind = None
63         if record.levelno >= logging.ERROR:
64             kind = 'error'
65         elif record.levelno >= logging.WARNING:
66             kind = 'warning'
67         if kind == 'warning' and record.name in ("salad", "crunchstat_summary"):
68             # Don't send validation warnings to runtime status,
69             # they're noisy and unhelpful.
70             return
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113             arvargs.git_info = True
114             arvargs.submit = False
115             arvargs.defer_downloads = False
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143         self.debug = False
144
145         if keep_client is not None:
146             self.keep_client = keep_client
147         else:
148             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
149
150         if arvargs.collection_cache_size:
151             collection_cache_size = arvargs.collection_cache_size*1024*1024
152             self.should_estimate_cache_size = False
153         else:
154             collection_cache_size = 256*1024*1024
155
156         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
157                                                 cap=collection_cache_size)
158
159         self.fetcher_constructor = partial(CollectionFetcher,
160                                            api_client=self.api,
161                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
162                                            num_retries=self.num_retries)
163
164         self.work_api = None
165         expected_api = ["containers"]
166         for api in expected_api:
167             try:
168                 methods = self.api._rootDesc.get('resources')[api]['methods']
169                 if ('httpMethod' in methods['create'] and
170                     (arvargs.work_api == api or arvargs.work_api is None)):
171                     self.work_api = api
172                     break
173             except KeyError:
174                 pass
175
176         if not self.work_api:
177             if arvargs.work_api is None:
178                 raise Exception("No supported APIs")
179             else:
180                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
181
182         if self.work_api == "jobs":
183             logger.error("""
184 *******************************
185 The 'jobs' API is no longer supported.
186 *******************************""")
187             exit(1)
188
189         self.loadingContext = ArvLoadingContext(vars(arvargs))
190         self.loadingContext.fetcher_constructor = self.fetcher_constructor
191         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
192         self.loadingContext.construct_tool_object = self.arv_make_tool
193
194         # Add a custom logging handler to the root logger for runtime status reporting
195         # if running inside a container
196         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
197             root_logger = logging.getLogger('')
198
199             # Remove existing RuntimeStatusLoggingHandlers if they exist
200             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
201             root_logger.handlers = handlers
202
203             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
204             root_logger.addHandler(handler)
205
206         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
207         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
208                                                      collection_cache=self.collection_cache)
209         self.toplevel_runtimeContext.secret_store = ArvSecretStore()
210
211         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
212
213         validate_cluster_target(self, self.toplevel_runtimeContext)
214
215
216     def arv_make_tool(self, toolpath_object, loadingContext):
217         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
218             return ArvadosCommandTool(self, toolpath_object, loadingContext)
219         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
220             return ArvadosWorkflow(self, toolpath_object, loadingContext)
221         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
222             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
223         else:
224             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
225
226     def output_callback(self, out, processStatus):
227         with self.workflow_eval_lock:
228             if processStatus == "success":
229                 logger.info("Overall process status is %s", processStatus)
230                 state = "Complete"
231             else:
232                 logger.error("Overall process status is %s", processStatus)
233                 state = "Failed"
234             if self.pipeline:
235                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
236                                                         body={"state": state}).execute(num_retries=self.num_retries)
237             self.final_status = processStatus
238             self.final_output = out
239             self.workflow_eval_lock.notifyAll()
240
241
242     def start_run(self, runnable, runtimeContext):
243         self.task_queue.add(partial(runnable.run, runtimeContext),
244                             self.workflow_eval_lock, self.stop_polling)
245
246     def process_submitted(self, container):
247         with self.workflow_eval_lock:
248             self.processes[container.uuid] = container
249
250     def process_done(self, uuid, record):
251         with self.workflow_eval_lock:
252             j = self.processes[uuid]
253             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
254             self.task_queue.add(partial(j.done, record),
255                                 self.workflow_eval_lock, self.stop_polling)
256             del self.processes[uuid]
257
258     def runtime_status_update(self, kind, message, detail=None):
259         """
260         Updates the runtime_status field on the runner container.
261         Called when there's a need to report errors, warnings or just
262         activity statuses, for example in the RuntimeStatusLoggingHandler.
263         """
264
265         if kind not in ('error', 'warning', 'activity'):
266             # Ignore any other status kind
267             return
268
269         with self.workflow_eval_lock:
270             current = None
271             try:
272                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
273             except Exception as e:
274                 logger.info("Couldn't get current container: %s", e)
275             if current is None:
276                 return
277             runtime_status = current.get('runtime_status', {})
278
279             original_updatemessage = updatemessage = runtime_status.get(kind, "")
280             if kind == "activity" or not updatemessage:
281                 updatemessage = message
282
283             # Subsequent messages tacked on in detail
284             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
285             maxlines = 40
286             if updatedetail.count("\n") < maxlines:
287                 if updatedetail:
288                     updatedetail += "\n"
289                 updatedetail += message + "\n"
290
291                 if detail:
292                     updatedetail += detail + "\n"
293
294                 if updatedetail.count("\n") >= maxlines:
295                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
296
297             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
298                 # don't waste time doing an update if nothing changed
299                 # (usually because we exceeded the max lines)
300                 return
301
302             runtime_status.update({
303                 kind: updatemessage,
304                 kind+'Detail': updatedetail,
305             })
306
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
334                 self.process_done(uuid, event["properties"]["new_attributes"])
335
336     def label(self, obj):
337         return "[%s %s]" % (self.work_api[0:-1], obj.name)
338
339     def poll_states(self):
340         """Poll status of containers listed in the processes dict.
341
342         Runs in a separate thread.
343         """
344
345         try:
346             remain_wait = self.poll_interval
347             while True:
348                 if remain_wait > 0:
349                     self.stop_polling.wait(remain_wait)
350                 if self.stop_polling.is_set():
351                     break
352                 with self.workflow_eval_lock:
353                     keys = list(self.processes)
354                 if not keys:
355                     remain_wait = self.poll_interval
356                     continue
357
358                 begin_poll = time.time()
359                 if self.work_api == "containers":
360                     table = self.poll_api.container_requests()
361
362                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
363
364                 while keys:
365                     page = keys[:pageSize]
366                     try:
367                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
368                                                                                          "output_uuid", "modified_at", "properties",
369                                                                                          "runtime_constraints"]).execute(num_retries=self.num_retries)
370                     except Exception as e:
371                         logger.warning("Temporary error checking states on API server: %s", e)
372                         remain_wait = self.poll_interval
373                         continue
374
375                     for p in proc_states["items"]:
376                         self.on_message({
377                             "object_uuid": p["uuid"],
378                             "event_type": "update",
379                             "properties": {
380                                 "new_attributes": p
381                             }
382                         })
383                     keys = keys[pageSize:]
384
385                 finish_poll = time.time()
386                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
387         except:
388             logger.exception("Fatal error in state polling thread.")
389             with self.workflow_eval_lock:
390                 self.processes.clear()
391                 self.workflow_eval_lock.notifyAll()
392         finally:
393             self.stop_polling.set()
394
395     def add_intermediate_output(self, uuid):
396         if uuid:
397             self.intermediate_output_collections.append(uuid)
398
399     def trash_intermediate_output(self):
400         logger.info("Cleaning up intermediate output collections")
401         for i in self.intermediate_output_collections:
402             try:
403                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
404             except Exception:
405                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
406             except (KeyboardInterrupt, SystemExit):
407                 break
408
409     def check_features(self, obj, parentfield=""):
410         if isinstance(obj, dict):
411             if obj.get("class") == "DockerRequirement":
412                 if obj.get("dockerOutputDirectory"):
413                     if not obj.get("dockerOutputDirectory").startswith('/'):
414                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
415                             "Option 'dockerOutputDirectory' must be an absolute path.")
416             if obj.get("class") == "InplaceUpdateRequirement":
417                 if obj["inplaceUpdate"] and parentfield == "requirements":
418                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
419             for k,v in obj.items():
420                 self.check_features(v, parentfield=k)
421         elif isinstance(obj, list):
422             for i,v in enumerate(obj):
423                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
424                     self.check_features(v, parentfield=parentfield)
425
426     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
427         outputObj = copy.deepcopy(outputObj)
428
429         files = []
430         def capture(fileobj):
431             files.append(fileobj)
432
433         adjustDirObjs(outputObj, capture)
434         adjustFileObjs(outputObj, capture)
435
436         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
437
438         final = arvados.collection.Collection(api_client=self.api,
439                                               keep_client=self.keep_client,
440                                               num_retries=self.num_retries)
441
442         for k,v in generatemapper.items():
443             if v.type == "Directory" and v.resolved.startswith("_:"):
444                     continue
445             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
446                 with final.open(v.target, "wb") as f:
447                     f.write(v.resolved.encode("utf-8"))
448                     continue
449
450             if not v.resolved.startswith("keep:"):
451                 raise Exception("Output source is not in keep or a literal")
452             sp = v.resolved.split("/")
453             srccollection = sp[0][5:]
454             try:
455                 reader = self.collection_cache.get(srccollection)
456                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
457                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
458             except arvados.errors.ArgumentError as e:
459                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
460                 raise
461             except IOError as e:
462                 logger.error("While preparing output collection: %s", e)
463                 raise
464
465         def rewrite(fileobj):
466             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
467             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
468                 if k in fileobj:
469                     del fileobj[k]
470
471         adjustDirObjs(outputObj, rewrite)
472         adjustFileObjs(outputObj, rewrite)
473
474         with final.open("cwl.output.json", "w") as f:
475             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
476             f.write(res)
477
478
479         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
480                        ensure_unique_name=True, properties=output_properties)
481
482         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
483                     final.api_response()["name"],
484                     final.manifest_locator())
485
486         final_uuid = final.manifest_locator()
487         tags = tagsString.split(',')
488         for tag in tags:
489              self.api.links().create(body={
490                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
491                 }).execute(num_retries=self.num_retries)
492
493         def finalcollection(fileobj):
494             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
495
496         adjustDirObjs(outputObj, finalcollection)
497         adjustFileObjs(outputObj, finalcollection)
498
499         return (outputObj, final)
500
501     def set_crunch_output(self):
502         if self.work_api == "containers":
503             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
504             if current is None:
505                 return
506             try:
507                 self.api.containers().update(uuid=current['uuid'],
508                                              body={
509                                                  'output': self.final_output_collection.portable_data_hash(),
510                                                  'output_properties': self.final_output_collection.get_properties(),
511                                              }).execute(num_retries=self.num_retries)
512                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
513                                               body={
514                                                   'is_trashed': True
515                                               }).execute(num_retries=self.num_retries)
516             except Exception:
517                 logger.exception("Setting container output")
518                 raise
519
520     def apply_reqs(self, job_order_object, tool):
521         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
522             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
523                 raise WorkflowException(
524                     "`cwl:requirements` in the input object is not part of CWL "
525                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
526                     "can set the cwlVersion to v1.1 or greater and re-run with "
527                     "--enable-dev.")
528             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
529             for req in job_reqs:
530                 tool.requirements.append(req)
531
532     @staticmethod
533     def get_git_info(tool):
534         in_a_git_repo = False
535         cwd = None
536         filepath = None
537
538         if tool.tool["id"].startswith("file://"):
539             # check if git is installed
540             try:
541                 filepath = uri_file_path(tool.tool["id"])
542                 cwd = os.path.dirname(filepath)
543                 subprocess.run(
544                     ["git", "log", "--format=%H", "-n1", "HEAD"],
545                     cwd=cwd,
546                     check=True,
547                     stdout=subprocess.DEVNULL,
548                 )
549                 in_a_git_repo = True
550             except Exception as e:
551                 pass
552
553         gitproperties = {}
554
555         if in_a_git_repo:
556             def git_output(cmd):
557                 return subprocess.run(
558                     cmd,
559                     cwd=cwd,
560                     stdout=subprocess.PIPE,
561                     universal_newlines=True,
562                 ).stdout.strip()
563             git_commit = git_output(["git", "log", "--format=%H", "-n1", "HEAD"])
564             git_date = git_output(["git", "log", "--format=%cD", "-n1", "HEAD"])
565             git_committer = git_output(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"])
566             git_branch = git_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
567             git_origin = git_output(["git", "remote", "get-url", "origin"])
568             git_status = git_output(["git", "status", "--untracked-files=no", "--porcelain"])
569             git_describe = git_output(["git", "describe", "--always", "--tags"])
570             git_toplevel = git_output(["git", "rev-parse", "--show-toplevel"])
571             git_path = filepath[len(git_toplevel):]
572
573             gitproperties = {
574                 "http://arvados.org/cwl#gitCommit": git_commit,
575                 "http://arvados.org/cwl#gitDate": git_date,
576                 "http://arvados.org/cwl#gitCommitter": git_committer,
577                 "http://arvados.org/cwl#gitBranch": git_branch,
578                 "http://arvados.org/cwl#gitOrigin": git_origin,
579                 "http://arvados.org/cwl#gitStatus": git_status,
580                 "http://arvados.org/cwl#gitDescribe": git_describe,
581                 "http://arvados.org/cwl#gitPath": git_path,
582             }
583         else:
584             for g in ("http://arvados.org/cwl#gitCommit",
585                       "http://arvados.org/cwl#gitDate",
586                       "http://arvados.org/cwl#gitCommitter",
587                       "http://arvados.org/cwl#gitBranch",
588                       "http://arvados.org/cwl#gitOrigin",
589                       "http://arvados.org/cwl#gitStatus",
590                       "http://arvados.org/cwl#gitDescribe",
591                       "http://arvados.org/cwl#gitPath"):
592                 if g in tool.metadata:
593                     gitproperties[g] = tool.metadata[g]
594
595         return gitproperties
596
597     def set_container_request_properties(self, container, properties):
598         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
599         for cr in resp["items"]:
600             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
601             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
602
603     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
604         self.debug = runtimeContext.debug
605
606         self.runtime_status_update("activity", "initialization")
607
608         git_info = self.get_git_info(updated_tool) if self.git_info else {}
609         if git_info:
610             logger.info("Git provenance")
611             for g in git_info:
612                 if git_info[g]:
613                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
614
615         runtimeContext.git_info = git_info
616
617         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
618         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
619         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
620         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
621
622         if not self.fast_submit:
623             updated_tool.visit(self.check_features)
624
625         self.pipeline = None
626         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
627         self.secret_store = runtimeContext.secret_store
628
629         self.trash_intermediate = runtimeContext.trash_intermediate
630         if self.trash_intermediate and self.work_api != "containers":
631             raise Exception("--trash-intermediate is only supported with --api=containers.")
632
633         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
634         if self.intermediate_output_ttl and self.work_api != "containers":
635             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
636         if self.intermediate_output_ttl < 0:
637             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
638
639         if runtimeContext.submit_request_uuid and self.work_api != "containers":
640             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
641
642         runtimeContext = runtimeContext.copy()
643
644         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
645         if runtimeContext.storage_classes == "default":
646             runtimeContext.storage_classes = default_storage_classes
647         if runtimeContext.intermediate_storage_classes == "default":
648             runtimeContext.intermediate_storage_classes = default_storage_classes
649
650         if not runtimeContext.name:
651             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
652             if git_info.get("http://arvados.org/cwl#gitDescribe"):
653                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
654             runtimeContext.name = self.name
655
656         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
657             # When creating or updating workflow record, by default
658             # always copy dependencies and ensure Docker images are up
659             # to date.
660             runtimeContext.copy_deps = True
661             runtimeContext.match_local_docker = True
662
663         if runtimeContext.print_keep_deps:
664             runtimeContext.copy_deps = False
665             runtimeContext.match_local_docker = False
666
667         if runtimeContext.update_workflow and self.project_uuid is None:
668             # If we are updating a workflow, make sure anything that
669             # gets uploaded goes into the same parent project, unless
670             # an alternate --project-uuid was provided.
671             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
672             runtimeContext.project_uuid = existing_wf["owner_uuid"]
673
674         self.project_uuid = runtimeContext.project_uuid
675
676         self.runtime_status_update("activity", "data transfer")
677
678         # Upload local file references in the job order.
679         with Perf(metrics, "upload_job_order"):
680             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
681                                          updated_tool, job_order, runtimeContext)
682
683         # determine if we are submitting or directly executing the workflow.
684         #
685         # the last clause means: if it is a command line tool, and we
686         # are going to wait for the result, and always_submit_runner
687         # is false, then we don't submit a runner process.
688
689         submitting = (runtimeContext.submit and not
690                        (updated_tool.tool["class"] == "CommandLineTool" and
691                         runtimeContext.wait and
692                         not runtimeContext.always_submit_runner))
693
694         loadingContext = self.loadingContext.copy()
695         loadingContext.do_validate = False
696         loadingContext.disable_js_validation = True
697         tool = updated_tool
698
699         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
700         # Also uploads docker images.
701         if not self.fast_submit:
702             logger.info("Uploading workflow dependencies")
703             with Perf(metrics, "upload_workflow_deps"):
704                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
705         else:
706             # in the fast submit case, we are running a workflow that
707             # has already been uploaded to Arvados, so we assume all
708             # the dependencies have been pinned to keep references and
709             # there is nothing to do.
710             merged_map = {}
711
712         loadingContext.loader = tool.doc_loader
713         loadingContext.avsc_names = tool.doc_schema
714         loadingContext.metadata = tool.metadata
715         loadingContext.skip_resolve_all = True
716
717         workflow_wrapper = None
718         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
719             # upload workflow and get back the workflow wrapper
720
721             workflow_wrapper = upload_workflow(self, tool, job_order,
722                                                runtimeContext.project_uuid,
723                                                runtimeContext,
724                                                uuid=runtimeContext.update_workflow,
725                                                submit_runner_ram=runtimeContext.submit_runner_ram,
726                                                name=runtimeContext.name,
727                                                merged_map=merged_map,
728                                                submit_runner_image=runtimeContext.submit_runner_image,
729                                                git_info=git_info,
730                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
731                                                jobmapper=jobmapper)
732
733             if runtimeContext.update_workflow or runtimeContext.create_workflow:
734                 # We're registering the workflow, so create or update
735                 # the workflow record and then exit.
736                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
737                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
738                 self.stdout.write(uuid + "\n")
739                 return (None, "success")
740
741             if runtimeContext.print_keep_deps:
742                 # Just find and print out all the collection dependencies and exit
743                 print_keep_deps(self, runtimeContext, merged_map, tool)
744                 return (None, "success")
745
746             # Did not register a workflow, we're going to submit
747             # it instead.
748             loadingContext.loader.idx.clear()
749             loadingContext.loader.idx["_:main"] = workflow_wrapper
750             workflow_wrapper["id"] = "_:main"
751
752             # Reload the minimal wrapper workflow.
753             self.fast_submit = True
754             tool = load_tool(workflow_wrapper, loadingContext)
755             loadingContext.loader.idx["_:main"] = workflow_wrapper
756
757         if not submitting:
758             # If we are going to run the workflow now (rather than
759             # submit it), we need to update the workflow document
760             # replacing file references with keep references.  If we
761             # are just going to construct a run submission, we don't
762             # need to do this.
763             update_from_merged_map(tool, merged_map)
764
765         self.apply_reqs(job_order, tool)
766
767         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
768         self.eval_timeout = runtimeContext.eval_timeout
769
770         runtimeContext.use_container = True
771         runtimeContext.tmpdir_prefix = "tmp"
772         runtimeContext.work_api = self.work_api
773
774         if not self.output_name:
775              self.output_name = "Output from workflow %s" % runtimeContext.name
776
777         self.output_name  = cleanup_name_for_collection(self.output_name)
778
779         if self.work_api == "containers":
780             if self.ignore_docker_for_reuse:
781                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
782             runtimeContext.outdir = "/var/spool/cwl"
783             runtimeContext.docker_outdir = "/var/spool/cwl"
784             runtimeContext.tmpdir = "/tmp"
785             runtimeContext.docker_tmpdir = "/tmp"
786
787         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
788             raise Exception("--priority must be in the range 1..1000.")
789
790         if self.should_estimate_cache_size:
791             visited = set()
792             estimated_size = [0]
793             def estimate_collection_cache(obj):
794                 if obj.get("location", "").startswith("keep:"):
795                     m = pdh_size.match(obj["location"][5:])
796                     if m and m.group(1) not in visited:
797                         visited.add(m.group(1))
798                         estimated_size[0] += int(m.group(2))
799             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
800             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
801             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
802
803         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
804
805         runnerjob = None
806         if runtimeContext.submit:
807             # We are submitting instead of running immediately.
808             #
809             # Create a "Runner job" that when run() is invoked,
810             # creates the container request to run the workflow.
811             if self.work_api == "containers":
812                 if submitting:
813                     loadingContext.metadata = updated_tool.metadata.copy()
814                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
815                                            self.output_name,
816                                            self.output_tags,
817                                            submit_runner_ram=runtimeContext.submit_runner_ram,
818                                            name=runtimeContext.name,
819                                            on_error=runtimeContext.on_error,
820                                            submit_runner_image=runtimeContext.submit_runner_image,
821                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
822                                            merged_map=merged_map,
823                                            priority=runtimeContext.priority,
824                                            secret_store=self.secret_store,
825                                            collection_cache_size=runtimeContext.collection_cache_size,
826                                            collection_cache_is_default=self.should_estimate_cache_size,
827                                            git_info=git_info)
828                 else:
829                     runtimeContext.runnerjob = tool.tool["id"]
830
831         if runtimeContext.cwl_runner_job is not None:
832             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
833
834         jobiter = tool.job(job_order,
835                            self.output_callback,
836                            runtimeContext)
837
838         if runtimeContext.submit and not runtimeContext.wait:
839             # User provided --no-wait so submit the container request,
840             # get the container request uuid, print it out, and exit.
841             runnerjob = next(jobiter)
842             runnerjob.run(runtimeContext)
843             self.stdout.write(runnerjob.uuid+"\n")
844             return (None, "success")
845
846         # We either running the workflow directly, or submitting it
847         # and will wait for a final result.
848
849         self.runtime_status_update("activity", "workflow execution")
850
851         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
852         if current_container:
853             logger.info("Running inside container %s", current_container.get("uuid"))
854             self.set_container_request_properties(current_container, git_info)
855
856         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
857         self.polling_thread = threading.Thread(target=self.poll_states)
858         self.polling_thread.start()
859
860         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
861
862         try:
863             self.workflow_eval_lock.acquire()
864
865             # Holds the lock while this code runs and releases it when
866             # it is safe to do so in self.workflow_eval_lock.wait(),
867             # at which point on_message can update job state and
868             # process output callbacks.
869
870             loopperf = Perf(metrics, "jobiter")
871             loopperf.__enter__()
872             for runnable in jobiter:
873                 loopperf.__exit__()
874
875                 if self.stop_polling.is_set():
876                     break
877
878                 if self.task_queue.error is not None:
879                     raise self.task_queue.error
880
881                 if runnable:
882                     with Perf(metrics, "run"):
883                         self.start_run(runnable, runtimeContext)
884                 else:
885                     if (self.task_queue.in_flight + len(self.processes)) > 0:
886                         self.workflow_eval_lock.wait(3)
887                     else:
888                         if self.final_status is None:
889                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
890                         break
891
892                 if self.stop_polling.is_set():
893                     break
894
895                 loopperf.__enter__()
896             loopperf.__exit__()
897
898             while (self.task_queue.in_flight + len(self.processes)) > 0:
899                 if self.task_queue.error is not None:
900                     raise self.task_queue.error
901                 self.workflow_eval_lock.wait(3)
902
903         except UnsupportedRequirement:
904             raise
905         except:
906             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
907                 logger.error("Interrupted, workflow will be cancelled")
908             elif isinstance(sys.exc_info()[1], WorkflowException):
909                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
910             else:
911                 logger.exception("Workflow execution failed")
912
913             if self.pipeline:
914                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
915                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
916
917             if self.work_api == "containers" and not current_container:
918                 # Not running in a crunch container, so cancel any outstanding processes.
919                 for p in self.processes:
920                     try:
921                         self.api.container_requests().update(uuid=p,
922                                                              body={"priority": "0"}
923                         ).execute(num_retries=self.num_retries)
924                     except Exception:
925                         pass
926         finally:
927             self.workflow_eval_lock.release()
928             self.task_queue.drain()
929             self.stop_polling.set()
930             self.polling_thread.join()
931             self.task_queue.join()
932
933         if self.final_status == "UnsupportedRequirement":
934             raise UnsupportedRequirement("Check log for details.")
935
936         if self.final_output is None:
937             raise WorkflowException("Workflow did not return a result.")
938
939         if runtimeContext.usage_report_notes:
940             logger.info("Steps with low resource utilization (possible optimization opportunities):")
941             for x in runtimeContext.usage_report_notes:
942                 logger.info("  %s", x)
943
944         if runtimeContext.submit and isinstance(tool, Runner):
945             logger.info("Final output collection %s", tool.final_output)
946             if workbench2 or workbench1:
947                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
948         else:
949             if self.output_tags is None:
950                 self.output_tags = ""
951
952             storage_classes = ""
953             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
954             if storage_class_req and storage_class_req.get("finalStorageClass"):
955                 storage_classes = aslist(storage_class_req["finalStorageClass"])
956             else:
957                 storage_classes = runtimeContext.storage_classes.strip().split(",")
958
959             output_properties = {}
960             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
961             if output_properties_req:
962                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
963                 for pr in output_properties_req["outputProperties"]:
964                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
965
966             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
967                                                                                           self.output_tags, output_properties,
968                                                                                           self.final_output)
969             self.set_crunch_output()
970
971         if runtimeContext.compute_checksum:
972             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
973             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
974
975         if self.trash_intermediate and self.final_status == "success":
976             self.trash_intermediate_output()
977
978         return (self.final_output, self.final_status)
979
980 def blank_secrets(job_order_object, process):
981     secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
982     pass