Merge branch '21904-no-unqueueable-reqs'
[arvados.git] / sdk / cwl / arvados_cwl / executor.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import argparse
6 import logging
7 import os
8 import sys
9 import threading
10 import copy
11 import json
12 import re
13 from functools import partial
14 import subprocess
15 import time
16 import urllib
17
18 from cwltool.errors import WorkflowException
19 import cwltool.workflow
20 from schema_salad.sourceline import SourceLine, cmap
21 import schema_salad.validate as validate
22 from schema_salad.ref_resolver import file_uri, uri_file_path
23
24 import arvados
25 import arvados.config
26 from arvados.keep import KeepClient
27 from arvados.errors import ApiError
28
29 import arvados_cwl.util
30 from .arvcontainer import RunnerContainer, cleanup_name_for_collection
31 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder, update_from_merged_map, print_keep_deps, ArvSecretStore
32 from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
33 from .arvworkflow import ArvadosWorkflow, upload_workflow, make_workflow_record
34 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
35 from .perf import Perf
36 from .pathmapper import NoFollowPathMapper
37 from cwltool.task_queue import TaskQueue
38 from .context import ArvLoadingContext, ArvRuntimeContext
39 from ._version import __version__
40
41 from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
42 from cwltool.utils import adjustFileObjs, adjustDirObjs, get_listing, visit_class, aslist
43 from cwltool.command_line_tool import compute_checksums
44 from cwltool.load_tool import load_tool
45
46 logger = logging.getLogger('arvados.cwl-runner')
47 metrics = logging.getLogger('arvados.cwl-runner.metrics')
48
49 DEFAULT_PRIORITY = 500
50
51 class RuntimeStatusLoggingHandler(logging.Handler):
52     """
53     Intercepts logging calls and report them as runtime statuses on runner
54     containers.
55     """
56     def __init__(self, runtime_status_update_func):
57         super(RuntimeStatusLoggingHandler, self).__init__()
58         self.runtime_status_update = runtime_status_update_func
59         self.updatingRuntimeStatus = False
60
61     def emit(self, record):
62         kind = None
63         if record.levelno >= logging.ERROR:
64             kind = 'error'
65         elif record.levelno >= logging.WARNING:
66             kind = 'warning'
67         if kind == 'warning' and record.name in ("salad", "crunchstat_summary"):
68             # Don't send validation warnings to runtime status,
69             # they're noisy and unhelpful.
70             return
71         if kind is not None and self.updatingRuntimeStatus is not True:
72             self.updatingRuntimeStatus = True
73             try:
74                 log_msg = record.getMessage()
75                 if '\n' in log_msg:
76                     # If the logged message is multi-line, use its first line as status
77                     # and the rest as detail.
78                     status, detail = log_msg.split('\n', 1)
79                     self.runtime_status_update(
80                         kind,
81                         "%s: %s" % (record.name, status),
82                         detail
83                     )
84                 else:
85                     self.runtime_status_update(
86                         kind,
87                         "%s: %s" % (record.name, record.getMessage())
88                     )
89             finally:
90                 self.updatingRuntimeStatus = False
91
92
93 class ArvCwlExecutor(object):
94     """Execute a CWL tool or workflow, submit work (using containers API),
95     wait for them to complete, and report output.
96
97     """
98
99     def __init__(self, api_client,
100                  arvargs=None,
101                  keep_client=None,
102                  num_retries=4,
103                  thread_count=4,
104                  stdout=sys.stdout):
105
106         if arvargs is None:
107             arvargs = argparse.Namespace()
108             arvargs.work_api = None
109             arvargs.output_name = None
110             arvargs.output_tags = None
111             arvargs.thread_count = 1
112             arvargs.collection_cache_size = None
113             arvargs.git_info = True
114             arvargs.submit = False
115             arvargs.defer_downloads = False
116
117         self.api = api_client
118         self.processes = {}
119         self.workflow_eval_lock = threading.Condition(threading.RLock())
120         self.final_output = None
121         self.final_status = None
122         self.num_retries = num_retries
123         self.uuid = None
124         self.stop_polling = threading.Event()
125         self.poll_api = None
126         self.pipeline = None
127         self.final_output_collection = None
128         self.output_name = arvargs.output_name
129         self.output_tags = arvargs.output_tags
130         self.project_uuid = None
131         self.intermediate_output_ttl = 0
132         self.intermediate_output_collections = []
133         self.trash_intermediate = False
134         self.thread_count = arvargs.thread_count
135         self.poll_interval = 12
136         self.loadingContext = None
137         self.should_estimate_cache_size = True
138         self.fs_access = None
139         self.secret_store = None
140         self.stdout = stdout
141         self.fast_submit = False
142         self.git_info = arvargs.git_info
143         self.debug = False
144
145         if keep_client is not None:
146             self.keep_client = keep_client
147         else:
148             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
149
150         if arvargs.collection_cache_size:
151             collection_cache_size = arvargs.collection_cache_size*1024*1024
152             self.should_estimate_cache_size = False
153         else:
154             collection_cache_size = 256*1024*1024
155
156         self.collection_cache = CollectionCache(self.api, self.keep_client, self.num_retries,
157                                                 cap=collection_cache_size)
158
159         self.fetcher_constructor = partial(CollectionFetcher,
160                                            api_client=self.api,
161                                            fs_access=CollectionFsAccess("", collection_cache=self.collection_cache),
162                                            num_retries=self.num_retries)
163
164         self.work_api = None
165         expected_api = ["containers"]
166         for api in expected_api:
167             try:
168                 methods = self.api._rootDesc.get('resources')[api]['methods']
169                 if ('httpMethod' in methods['create'] and
170                     (arvargs.work_api == api or arvargs.work_api is None)):
171                     self.work_api = api
172                     break
173             except KeyError:
174                 pass
175
176         if not self.work_api:
177             if arvargs.work_api is None:
178                 raise Exception("No supported APIs")
179             else:
180                 raise Exception("Unsupported API '%s', expected one of %s" % (arvargs.work_api, expected_api))
181
182         if self.work_api == "jobs":
183             logger.error("""
184 *******************************
185 The 'jobs' API is no longer supported.
186 *******************************""")
187             exit(1)
188
189         self.loadingContext = ArvLoadingContext(vars(arvargs))
190         self.loadingContext.fetcher_constructor = self.fetcher_constructor
191         self.loadingContext.resolver = partial(collectionResolver, self.api, num_retries=self.num_retries)
192         self.loadingContext.construct_tool_object = self.arv_make_tool
193
194         # Add a custom logging handler to the root logger for runtime status reporting
195         # if running inside a container
196         if arvados_cwl.util.get_current_container(self.api, self.num_retries, logger):
197             root_logger = logging.getLogger('')
198
199             # Remove existing RuntimeStatusLoggingHandlers if they exist
200             handlers = [h for h in root_logger.handlers if not isinstance(h, RuntimeStatusLoggingHandler)]
201             root_logger.handlers = handlers
202
203             handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
204             root_logger.addHandler(handler)
205
206         self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
207         self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
208                                                      collection_cache=self.collection_cache)
209         self.toplevel_runtimeContext.secret_store = ArvSecretStore()
210
211         self.defer_downloads = arvargs.submit and arvargs.defer_downloads
212
213         validate_cluster_target(self, self.toplevel_runtimeContext)
214
215
216     def arv_make_tool(self, toolpath_object, loadingContext):
217         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
218             return ArvadosCommandTool(self, toolpath_object, loadingContext)
219         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
220             return ArvadosWorkflow(self, toolpath_object, loadingContext)
221         elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
222             return ArvadosExpressionTool(self, toolpath_object, loadingContext)
223         else:
224             raise Exception("Unknown tool %s" % toolpath_object.get("class"))
225
226     def output_callback(self, out, processStatus):
227         with self.workflow_eval_lock:
228             if processStatus == "success":
229                 logger.info("Overall process status is %s", processStatus)
230                 state = "Complete"
231             else:
232                 logger.error("Overall process status is %s", processStatus)
233                 state = "Failed"
234             if self.pipeline:
235                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
236                                                         body={"state": state}).execute(num_retries=self.num_retries)
237             self.final_status = processStatus
238             self.final_output = out
239             self.workflow_eval_lock.notifyAll()
240
241
242     def start_run(self, runnable, runtimeContext):
243         self.task_queue.add(partial(runnable.run, runtimeContext),
244                             self.workflow_eval_lock, self.stop_polling)
245
246     def process_submitted(self, container):
247         with self.workflow_eval_lock:
248             self.processes[container.uuid] = container
249
250     def process_done(self, uuid, record):
251         with self.workflow_eval_lock:
252             j = self.processes[uuid]
253             logger.info("%s %s is %s", self.label(j), uuid, record["state"])
254             self.task_queue.add(partial(j.done, record),
255                                 self.workflow_eval_lock, self.stop_polling)
256             del self.processes[uuid]
257
258     def runtime_status_update(self, kind, message, detail=None):
259         """
260         Updates the runtime_status field on the runner container.
261         Called when there's a need to report errors, warnings or just
262         activity statuses, for example in the RuntimeStatusLoggingHandler.
263         """
264
265         if kind not in ('error', 'warning', 'activity'):
266             # Ignore any other status kind
267             return
268
269         with self.workflow_eval_lock:
270             current = None
271             try:
272                 current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
273             except Exception as e:
274                 logger.info("Couldn't get current container: %s", e)
275             if current is None:
276                 return
277             runtime_status = current.get('runtime_status', {})
278
279             original_updatemessage = updatemessage = runtime_status.get(kind, "")
280             if kind == "activity" or not updatemessage:
281                 updatemessage = message
282
283             # Subsequent messages tacked on in detail
284             original_updatedetail = updatedetail = runtime_status.get(kind+'Detail', "")
285             maxlines = 40
286             if updatedetail.count("\n") < maxlines:
287                 if updatedetail:
288                     updatedetail += "\n"
289                 updatedetail += message + "\n"
290
291                 if detail:
292                     updatedetail += detail + "\n"
293
294                 if updatedetail.count("\n") >= maxlines:
295                     updatedetail += "\nSome messages may have been omitted.  Check the full log."
296
297             if updatemessage == original_updatemessage and updatedetail == original_updatedetail:
298                 # don't waste time doing an update if nothing changed
299                 # (usually because we exceeded the max lines)
300                 return
301
302             runtime_status.update({
303                 kind: updatemessage,
304                 kind+'Detail': updatedetail,
305             })
306
307             try:
308                 self.api.containers().update(uuid=current['uuid'],
309                                             body={
310                                                 'runtime_status': runtime_status,
311                                             }).execute(num_retries=self.num_retries)
312             except Exception as e:
313                 logger.info("Couldn't update runtime_status: %s", e)
314
315     def wrapped_callback(self, cb, obj, st):
316         with self.workflow_eval_lock:
317             cb(obj, st)
318             self.workflow_eval_lock.notifyAll()
319
320     def get_wrapped_callback(self, cb):
321         return partial(self.wrapped_callback, cb)
322
323     def on_message(self, event):
324         if event.get("object_uuid") in self.processes and event["event_type"] == "update":
325             uuid = event["object_uuid"]
326             if event["properties"]["new_attributes"]["state"] == "Running":
327                 with self.workflow_eval_lock:
328                     j = self.processes[uuid]
329                     if j.running is False:
330                         j.running = True
331                         j.update_pipeline_component(event["properties"]["new_attributes"])
332                         logger.info("%s %s is Running", self.label(j), uuid)
333             elif event["properties"]["new_attributes"]["state"] == "Final":
334                 # underlying container is completed or cancelled
335                 self.process_done(uuid, event["properties"]["new_attributes"])
336             elif (event["properties"]["new_attributes"]["state"] == "Committed" and
337                   event["properties"]["new_attributes"]["priority"] == 0):
338                 # cancelled before it got a chance to run, remains in
339                 # comitted state but isn't going to run so treat it as
340                 # cancelled.
341                 self.process_done(uuid, event["properties"]["new_attributes"])
342
343
344     def label(self, obj):
345         return "[%s %s]" % (self.work_api[0:-1], obj.name)
346
347     def poll_states(self):
348         """Poll status of containers listed in the processes dict.
349
350         Runs in a separate thread.
351         """
352
353         try:
354             remain_wait = self.poll_interval
355             while True:
356                 if remain_wait > 0:
357                     self.stop_polling.wait(remain_wait)
358                 if self.stop_polling.is_set():
359                     break
360                 with self.workflow_eval_lock:
361                     keys = list(self.processes)
362                 if not keys:
363                     remain_wait = self.poll_interval
364                     continue
365
366                 begin_poll = time.time()
367                 if self.work_api == "containers":
368                     table = self.poll_api.container_requests()
369
370                 pageSize = self.poll_api._rootDesc.get('maxItemsPerResponse', 1000)
371
372                 while keys:
373                     page = keys[:pageSize]
374                     try:
375                         proc_states = table.list(filters=[["uuid", "in", page]], select=["uuid", "container_uuid", "state", "log_uuid",
376                                                                                          "output_uuid", "modified_at", "properties",
377                                                                                          "runtime_constraints", "priority"]).execute(num_retries=self.num_retries)
378                     except Exception as e:
379                         logger.warning("Temporary error checking states on API server: %s", e)
380                         remain_wait = self.poll_interval
381                         continue
382
383                     for p in proc_states["items"]:
384                         self.on_message({
385                             "object_uuid": p["uuid"],
386                             "event_type": "update",
387                             "properties": {
388                                 "new_attributes": p
389                             }
390                         })
391                     keys = keys[pageSize:]
392
393                 finish_poll = time.time()
394                 remain_wait = self.poll_interval - (finish_poll - begin_poll)
395         except:
396             logger.exception("Fatal error in state polling thread.")
397             with self.workflow_eval_lock:
398                 self.processes.clear()
399                 self.workflow_eval_lock.notifyAll()
400         finally:
401             self.stop_polling.set()
402
403     def add_intermediate_output(self, uuid):
404         if uuid:
405             self.intermediate_output_collections.append(uuid)
406
407     def trash_intermediate_output(self):
408         logger.info("Cleaning up intermediate output collections")
409         for i in self.intermediate_output_collections:
410             try:
411                 self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
412             except Exception:
413                 logger.warning("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
414             except (KeyboardInterrupt, SystemExit):
415                 break
416
417     def check_features(self, obj, parentfield=""):
418         if isinstance(obj, dict):
419             if obj.get("class") == "DockerRequirement":
420                 if obj.get("dockerOutputDirectory"):
421                     if not obj.get("dockerOutputDirectory").startswith('/'):
422                         raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
423                             "Option 'dockerOutputDirectory' must be an absolute path.")
424             if obj.get("class") == "InplaceUpdateRequirement":
425                 if obj["inplaceUpdate"] and parentfield == "requirements":
426                     raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
427             for k,v in obj.items():
428                 self.check_features(v, parentfield=k)
429         elif isinstance(obj, list):
430             for i,v in enumerate(obj):
431                 with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
432                     self.check_features(v, parentfield=parentfield)
433
434     def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
435         outputObj = copy.deepcopy(outputObj)
436
437         files = []
438         def captureFile(fileobj):
439             files.append(fileobj)
440
441         def captureDir(dirobj):
442             if dirobj["location"].startswith("keep:") and 'listing' in dirobj:
443                 del dirobj['listing']
444             files.append(dirobj)
445
446         adjustDirObjs(outputObj, captureDir)
447         adjustFileObjs(outputObj, captureFile)
448
449         generatemapper = NoFollowPathMapper(files, "", "", separateDirs=False)
450
451         final = arvados.collection.Collection(api_client=self.api,
452                                               keep_client=self.keep_client,
453                                               num_retries=self.num_retries)
454
455         for k,v in generatemapper.items():
456             if v.type == "Directory" and v.resolved.startswith("_:"):
457                     continue
458             if v.type == "CreateFile" and (k.startswith("_:") or v.resolved.startswith("_:")):
459                 with final.open(v.target, "wb") as f:
460                     f.write(v.resolved.encode("utf-8"))
461                     continue
462
463             if not v.resolved.startswith("keep:"):
464                 raise Exception("Output source is not in keep or a literal")
465             sp = v.resolved.split("/")
466             srccollection = sp[0][5:]
467             try:
468                 reader = self.collection_cache.get(srccollection)
469                 srcpath = urllib.parse.unquote("/".join(sp[1:]) if len(sp) > 1 else ".")
470                 final.copy(srcpath, v.target, source_collection=reader, overwrite=False)
471             except arvados.errors.ArgumentError as e:
472                 logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
473                 raise
474             except IOError as e:
475                 logger.error("While preparing output collection: %s", e)
476                 raise
477
478         def rewrite(fileobj):
479             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
480             for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
481                 if k in fileobj:
482                     del fileobj[k]
483
484         adjustDirObjs(outputObj, rewrite)
485         adjustFileObjs(outputObj, rewrite)
486
487         with final.open("cwl.output.json", "w") as f:
488             res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
489             f.write(res)
490
491
492         final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
493                        ensure_unique_name=True, properties=output_properties)
494
495         logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
496                     final.api_response()["name"],
497                     final.manifest_locator())
498
499         final_uuid = final.manifest_locator()
500         tags = tagsString.split(',')
501         for tag in tags:
502              self.api.links().create(body={
503                 "head_uuid": final_uuid, "link_class": "tag", "name": tag
504                 }).execute(num_retries=self.num_retries)
505
506         def finalcollection(fileobj):
507             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
508
509         adjustDirObjs(outputObj, finalcollection)
510         adjustFileObjs(outputObj, finalcollection)
511
512         return (outputObj, final)
513
514     def set_crunch_output(self):
515         if self.work_api == "containers":
516             current = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
517             if current is None:
518                 return
519             try:
520                 self.api.containers().update(uuid=current['uuid'],
521                                              body={
522                                                  'output': self.final_output_collection.portable_data_hash(),
523                                                  'output_properties': self.final_output_collection.get_properties(),
524                                              }).execute(num_retries=self.num_retries)
525                 self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
526                                               body={
527                                                   'is_trashed': True
528                                               }).execute(num_retries=self.num_retries)
529             except Exception:
530                 logger.exception("Setting container output")
531                 raise
532
533     def apply_reqs(self, job_order_object, tool):
534         if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
535             if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
536                 raise WorkflowException(
537                     "`cwl:requirements` in the input object is not part of CWL "
538                     "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
539                     "can set the cwlVersion to v1.1 or greater and re-run with "
540                     "--enable-dev.")
541             job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
542             for req in job_reqs:
543                 tool.requirements.append(req)
544
545     @staticmethod
546     def get_git_info(tool):
547         in_a_git_repo = False
548         cwd = None
549         filepath = None
550
551         if tool.tool["id"].startswith("file://"):
552             # check if git is installed
553             try:
554                 filepath = uri_file_path(tool.tool["id"])
555                 cwd = os.path.dirname(filepath)
556                 subprocess.run(
557                     ["git", "log", "--format=%H", "-n1", "HEAD"],
558                     cwd=cwd,
559                     check=True,
560                     stdout=subprocess.DEVNULL,
561                 )
562                 in_a_git_repo = True
563             except Exception as e:
564                 pass
565
566         gitproperties = {}
567
568         if in_a_git_repo:
569             def git_output(cmd):
570                 return subprocess.run(
571                     cmd,
572                     cwd=cwd,
573                     stdout=subprocess.PIPE,
574                     universal_newlines=True,
575                 ).stdout.strip()
576             git_commit = git_output(["git", "log", "--format=%H", "-n1", "HEAD"])
577             git_date = git_output(["git", "log", "--format=%cD", "-n1", "HEAD"])
578             git_committer = git_output(["git", "log", "--format=%cn <%ce>", "-n1", "HEAD"])
579             git_branch = git_output(["git", "rev-parse", "--abbrev-ref", "HEAD"])
580             git_origin = git_output(["git", "remote", "get-url", "origin"])
581             git_status = git_output(["git", "status", "--untracked-files=no", "--porcelain"])
582             git_describe = git_output(["git", "describe", "--always", "--tags"])
583             git_toplevel = git_output(["git", "rev-parse", "--show-toplevel"])
584             git_path = filepath[len(git_toplevel):]
585
586             gitproperties = {
587                 "http://arvados.org/cwl#gitCommit": git_commit,
588                 "http://arvados.org/cwl#gitDate": git_date,
589                 "http://arvados.org/cwl#gitCommitter": git_committer,
590                 "http://arvados.org/cwl#gitBranch": git_branch,
591                 "http://arvados.org/cwl#gitOrigin": git_origin,
592                 "http://arvados.org/cwl#gitStatus": git_status,
593                 "http://arvados.org/cwl#gitDescribe": git_describe,
594                 "http://arvados.org/cwl#gitPath": git_path,
595             }
596         else:
597             for g in ("http://arvados.org/cwl#gitCommit",
598                       "http://arvados.org/cwl#gitDate",
599                       "http://arvados.org/cwl#gitCommitter",
600                       "http://arvados.org/cwl#gitBranch",
601                       "http://arvados.org/cwl#gitOrigin",
602                       "http://arvados.org/cwl#gitStatus",
603                       "http://arvados.org/cwl#gitDescribe",
604                       "http://arvados.org/cwl#gitPath"):
605                 if g in tool.metadata:
606                     gitproperties[g] = tool.metadata[g]
607
608         return gitproperties
609
610     def set_container_request_properties(self, container, properties):
611         resp = self.api.container_requests().list(filters=[["container_uuid", "=", container["uuid"]]], select=["uuid", "properties"]).execute(num_retries=self.num_retries)
612         for cr in resp["items"]:
613             cr["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in properties.items()})
614             self.api.container_requests().update(uuid=cr["uuid"], body={"container_request": {"properties": cr["properties"]}}).execute(num_retries=self.num_retries)
615
616     def arv_executor(self, updated_tool, job_order, runtimeContext, logger=None):
617         self.debug = runtimeContext.debug
618
619         self.runtime_status_update("activity", "initialization")
620
621         git_info = self.get_git_info(updated_tool) if self.git_info else {}
622         if git_info:
623             logger.info("Git provenance")
624             for g in git_info:
625                 if git_info[g]:
626                     logger.info("  %s: %s", g.split("#", 1)[1], git_info[g])
627
628         runtimeContext.git_info = git_info
629
630         workbench1 = self.api.config()["Services"]["Workbench1"]["ExternalURL"]
631         workbench2 = self.api.config()["Services"]["Workbench2"]["ExternalURL"]
632         controller = self.api.config()["Services"]["Controller"]["ExternalURL"]
633         logger.info("Using cluster %s (%s)", self.api.config()["ClusterID"], workbench2 or workbench1 or controller)
634
635         if not self.fast_submit:
636             updated_tool.visit(self.check_features)
637
638         self.pipeline = None
639         self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
640         self.secret_store = runtimeContext.secret_store
641
642         self.trash_intermediate = runtimeContext.trash_intermediate
643         if self.trash_intermediate and self.work_api != "containers":
644             raise Exception("--trash-intermediate is only supported with --api=containers.")
645
646         self.intermediate_output_ttl = runtimeContext.intermediate_output_ttl
647         if self.intermediate_output_ttl and self.work_api != "containers":
648             raise Exception("--intermediate-output-ttl is only supported with --api=containers.")
649         if self.intermediate_output_ttl < 0:
650             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
651
652         if runtimeContext.submit_request_uuid and self.work_api != "containers":
653             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
654
655         runtimeContext = runtimeContext.copy()
656
657         default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
658         if runtimeContext.storage_classes == "default":
659             runtimeContext.storage_classes = default_storage_classes
660         if runtimeContext.intermediate_storage_classes == "default":
661             runtimeContext.intermediate_storage_classes = default_storage_classes
662
663         if not runtimeContext.name:
664             self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
665             if git_info.get("http://arvados.org/cwl#gitDescribe"):
666                 self.name = "%s (%s)" % (self.name, git_info.get("http://arvados.org/cwl#gitDescribe"))
667             runtimeContext.name = self.name
668
669         if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
670             # When creating or updating workflow record, by default
671             # always copy dependencies and ensure Docker images are up
672             # to date.
673             runtimeContext.copy_deps = True
674             runtimeContext.match_local_docker = True
675
676         if runtimeContext.print_keep_deps:
677             runtimeContext.copy_deps = False
678             runtimeContext.match_local_docker = False
679
680         if runtimeContext.update_workflow and self.project_uuid is None:
681             # If we are updating a workflow, make sure anything that
682             # gets uploaded goes into the same parent project, unless
683             # an alternate --project-uuid was provided.
684             existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
685             runtimeContext.project_uuid = existing_wf["owner_uuid"]
686
687         self.project_uuid = runtimeContext.project_uuid
688
689         self.runtime_status_update("activity", "data transfer")
690
691         # Upload local file references in the job order.
692         with Perf(metrics, "upload_job_order"):
693             job_order, jobmapper = upload_job_order(self, "%s input" % runtimeContext.name,
694                                          updated_tool, job_order, runtimeContext)
695
696         # determine if we are submitting or directly executing the workflow.
697         #
698         # the last clause means: if it is a command line tool, and we
699         # are going to wait for the result, and always_submit_runner
700         # is false, then we don't submit a runner process.
701
702         submitting = (runtimeContext.submit and not
703                        (updated_tool.tool["class"] == "CommandLineTool" and
704                         runtimeContext.wait and
705                         not runtimeContext.always_submit_runner))
706
707         loadingContext = self.loadingContext.copy()
708         loadingContext.do_validate = False
709         loadingContext.disable_js_validation = True
710         tool = updated_tool
711
712         # Upload direct dependencies of workflow steps, get back mapping of files to keep references.
713         # Also uploads docker images.
714         if not self.fast_submit:
715             logger.info("Uploading workflow dependencies")
716             with Perf(metrics, "upload_workflow_deps"):
717                 merged_map = upload_workflow_deps(self, tool, runtimeContext)
718         else:
719             # in the fast submit case, we are running a workflow that
720             # has already been uploaded to Arvados, so we assume all
721             # the dependencies have been pinned to keep references and
722             # there is nothing to do.
723             merged_map = {}
724
725         loadingContext.loader = tool.doc_loader
726         loadingContext.avsc_names = tool.doc_schema
727         loadingContext.metadata = tool.metadata
728         loadingContext.skip_resolve_all = True
729
730         workflow_wrapper = None
731         if (submitting and not self.fast_submit) or runtimeContext.update_workflow or runtimeContext.create_workflow or runtimeContext.print_keep_deps:
732             # upload workflow and get back the workflow wrapper
733
734             workflow_wrapper = upload_workflow(self, tool, job_order,
735                                                runtimeContext.project_uuid,
736                                                runtimeContext,
737                                                uuid=runtimeContext.update_workflow,
738                                                submit_runner_ram=runtimeContext.submit_runner_ram,
739                                                name=runtimeContext.name,
740                                                merged_map=merged_map,
741                                                submit_runner_image=runtimeContext.submit_runner_image,
742                                                git_info=git_info,
743                                                set_defaults=(runtimeContext.update_workflow or runtimeContext.create_workflow),
744                                                jobmapper=jobmapper)
745
746             if runtimeContext.update_workflow or runtimeContext.create_workflow:
747                 # We're registering the workflow, so create or update
748                 # the workflow record and then exit.
749                 uuid = make_workflow_record(self, workflow_wrapper, runtimeContext.name, tool,
750                                             runtimeContext.project_uuid, runtimeContext.update_workflow)
751                 self.stdout.write(uuid + "\n")
752                 return (None, "success")
753
754             if runtimeContext.print_keep_deps:
755                 # Just find and print out all the collection dependencies and exit
756                 print_keep_deps(self, runtimeContext, merged_map, tool)
757                 return (None, "success")
758
759             # Did not register a workflow, we're going to submit
760             # it instead.
761             loadingContext.loader.idx.clear()
762             loadingContext.loader.idx["_:main"] = workflow_wrapper
763             workflow_wrapper["id"] = "_:main"
764
765             # Reload the minimal wrapper workflow.
766             self.fast_submit = True
767             tool = load_tool(workflow_wrapper, loadingContext)
768             loadingContext.loader.idx["_:main"] = workflow_wrapper
769
770         if not submitting:
771             # If we are going to run the workflow now (rather than
772             # submit it), we need to update the workflow document
773             # replacing file references with keep references.  If we
774             # are just going to construct a run submission, we don't
775             # need to do this.
776             update_from_merged_map(tool, merged_map)
777
778         self.apply_reqs(job_order, tool)
779
780         self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
781         self.eval_timeout = runtimeContext.eval_timeout
782
783         runtimeContext.use_container = True
784         runtimeContext.tmpdir_prefix = "tmp"
785         runtimeContext.work_api = self.work_api
786
787         if not self.output_name:
788              self.output_name = "Output from workflow %s" % runtimeContext.name
789
790         self.output_name  = cleanup_name_for_collection(self.output_name)
791
792         if self.work_api == "containers":
793             if self.ignore_docker_for_reuse:
794                 raise Exception("--ignore-docker-for-reuse not supported with containers API.")
795             runtimeContext.outdir = "/var/spool/cwl"
796             runtimeContext.docker_outdir = "/var/spool/cwl"
797             runtimeContext.tmpdir = "/tmp"
798             runtimeContext.docker_tmpdir = "/tmp"
799
800         if runtimeContext.priority < 1 or runtimeContext.priority > 1000:
801             raise Exception("--priority must be in the range 1..1000.")
802
803         if self.should_estimate_cache_size:
804             visited = set()
805             estimated_size = [0]
806             def estimate_collection_cache(obj):
807                 if obj.get("location", "").startswith("keep:"):
808                     m = pdh_size.match(obj["location"][5:])
809                     if m and m.group(1) not in visited:
810                         visited.add(m.group(1))
811                         estimated_size[0] += int(m.group(2))
812             visit_class(job_order, ("File", "Directory"), estimate_collection_cache)
813             runtimeContext.collection_cache_size = max(((estimated_size[0]*192) // (1024*1024))+1, 256)
814             self.collection_cache.set_cap(runtimeContext.collection_cache_size*1024*1024)
815
816         logger.info("Using collection cache size %s MiB", runtimeContext.collection_cache_size)
817
818         runnerjob = None
819         if runtimeContext.submit:
820             # We are submitting instead of running immediately.
821             #
822             # Create a "Runner job" that when run() is invoked,
823             # creates the container request to run the workflow.
824             if self.work_api == "containers":
825                 if submitting:
826                     loadingContext.metadata = updated_tool.metadata.copy()
827                     tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
828                                            self.output_name,
829                                            self.output_tags,
830                                            submit_runner_ram=runtimeContext.submit_runner_ram,
831                                            name=runtimeContext.name,
832                                            on_error=runtimeContext.on_error,
833                                            submit_runner_image=runtimeContext.submit_runner_image,
834                                            intermediate_output_ttl=runtimeContext.intermediate_output_ttl,
835                                            merged_map=merged_map,
836                                            priority=runtimeContext.priority,
837                                            secret_store=self.secret_store,
838                                            collection_cache_size=runtimeContext.collection_cache_size,
839                                            collection_cache_is_default=self.should_estimate_cache_size,
840                                            git_info=git_info)
841                 else:
842                     runtimeContext.runnerjob = tool.tool["id"]
843
844         if runtimeContext.cwl_runner_job is not None:
845             self.uuid = runtimeContext.cwl_runner_job.get('uuid')
846
847         jobiter = tool.job(job_order,
848                            self.output_callback,
849                            runtimeContext)
850
851         if runtimeContext.submit and not runtimeContext.wait:
852             # User provided --no-wait so submit the container request,
853             # get the container request uuid, print it out, and exit.
854             runnerjob = next(jobiter)
855             runnerjob.run(runtimeContext)
856             self.stdout.write(runnerjob.uuid+"\n")
857             return (None, "success")
858
859         # We either running the workflow directly, or submitting it
860         # and will wait for a final result.
861
862         self.runtime_status_update("activity", "workflow execution")
863
864         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
865         if current_container:
866             logger.info("Running inside container %s", current_container.get("uuid"))
867             self.set_container_request_properties(current_container, git_info)
868
869         self.poll_api = arvados.api('v1', timeout=runtimeContext.http_timeout)
870         self.polling_thread = threading.Thread(target=self.poll_states)
871         self.polling_thread.start()
872
873         self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
874
875         try:
876             self.workflow_eval_lock.acquire()
877
878             # Holds the lock while this code runs and releases it when
879             # it is safe to do so in self.workflow_eval_lock.wait(),
880             # at which point on_message can update job state and
881             # process output callbacks.
882
883             loopperf = Perf(metrics, "jobiter")
884             loopperf.__enter__()
885             for runnable in jobiter:
886                 loopperf.__exit__()
887
888                 if self.stop_polling.is_set():
889                     break
890
891                 if self.task_queue.error is not None:
892                     raise self.task_queue.error
893
894                 if runnable:
895                     with Perf(metrics, "run"):
896                         self.start_run(runnable, runtimeContext)
897                 else:
898                     if (self.task_queue.in_flight + len(self.processes)) > 0:
899                         self.workflow_eval_lock.wait(3)
900                     else:
901                         if self.final_status is None:
902                             logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
903                         break
904
905                 if self.stop_polling.is_set():
906                     break
907
908                 loopperf.__enter__()
909             loopperf.__exit__()
910
911             while (self.task_queue.in_flight + len(self.processes)) > 0:
912                 if self.task_queue.error is not None:
913                     raise self.task_queue.error
914                 self.workflow_eval_lock.wait(3)
915
916         except UnsupportedRequirement:
917             raise
918         except:
919             if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
920                 logger.error("Interrupted, workflow will be cancelled")
921             elif isinstance(sys.exc_info()[1], WorkflowException):
922                 logger.error("Workflow execution failed:\n%s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
923             else:
924                 logger.exception("Workflow execution failed")
925
926             if self.pipeline:
927                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
928                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
929
930             if self.work_api == "containers" and not current_container:
931                 # Not running in a crunch container, so cancel any outstanding processes.
932                 for p in self.processes:
933                     try:
934                         self.api.container_requests().update(uuid=p,
935                                                              body={"priority": "0"}
936                         ).execute(num_retries=self.num_retries)
937                     except Exception:
938                         pass
939         finally:
940             self.workflow_eval_lock.release()
941             self.task_queue.drain()
942             self.stop_polling.set()
943             self.polling_thread.join()
944             self.task_queue.join()
945
946         if self.final_status == "UnsupportedRequirement":
947             raise UnsupportedRequirement("Check log for details.")
948
949         if self.final_output is None:
950             raise WorkflowException("Workflow did not return a result.")
951
952         if runtimeContext.usage_report_notes:
953             logger.info("Steps with low resource utilization (possible optimization opportunities):")
954             for x in runtimeContext.usage_report_notes:
955                 logger.info("  %s", x)
956
957         if runtimeContext.submit and isinstance(tool, Runner):
958             logger.info("Final output collection %s", tool.final_output)
959             if workbench2 or workbench1:
960                 logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
961         else:
962             if self.output_tags is None:
963                 self.output_tags = ""
964
965             storage_classes = ""
966             storage_class_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputStorageClass")
967             if storage_class_req and storage_class_req.get("finalStorageClass"):
968                 storage_classes = aslist(storage_class_req["finalStorageClass"])
969             else:
970                 storage_classes = runtimeContext.storage_classes.strip().split(",")
971
972             output_properties = {}
973             output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
974             if output_properties_req:
975                 builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
976                 for pr in output_properties_req["outputProperties"]:
977                     output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
978
979             self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
980                                                                                           self.output_tags, output_properties,
981                                                                                           self.final_output)
982             self.set_crunch_output()
983
984         if runtimeContext.compute_checksum:
985             adjustDirObjs(self.final_output, partial(get_listing, self.fs_access))
986             adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
987
988         if self.trash_intermediate and self.final_status == "success":
989             self.trash_intermediate_output()
990
991         return (self.final_output, self.final_status)
992
993 def blank_secrets(job_order_object, process):
994     secrets_req, _ = process.get_requirement("http://commonwl.org/cwltool#Secrets")
995     pass