if not out_of_project_images:
# Fetch Docker image if necessary.
try:
- result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
+ dockerjob = cwltool.docker.DockerCommandLineJob(None, None, None, None, None, None)
+ result = dockerjob.get_image(dockerRequirement, pull_image,
force_pull, tmp_outdir_prefix)
if not result:
raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
import json
import copy
import logging
+import urllib
+from io import StringIO
+import sys
+
+from typing import (MutableSequence, MutableMapping)
+
+from ruamel.yaml import YAML
+from ruamel.yaml.comments import CommentedMap, CommentedSeq
from schema_salad.sourceline import SourceLine, cmap
import schema_salad.ref_resolver
from cwltool.utils import adjustFileObjs, adjustDirObjs, visit_class, normalizeFilesDirs
from cwltool.context import LoadingContext
+from schema_salad.ref_resolver import file_uri, uri_file_path
+
import ruamel.yaml as yaml
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
return json.dumps(doc, sort_keys=True, indent=4, separators=(',',': '))
+def rel_ref(s, baseuri, urlexpander, merged_map):
+ uri = urlexpander(s, baseuri)
+ if baseuri in merged_map:
+ replacements = merged_map[baseuri].resolved
+ if uri in replacements:
+ return replacements[uri]
+
+ p1 = os.path.dirname(uri_file_path(baseuri))
+ p2 = os.path.dirname(uri_file_path(uri))
+ p3 = os.path.basename(uri_file_path(uri))
+ r = os.path.relpath(p2, p1)
+ if r == ".":
+ r = ""
+ print("AAA", uri, s)
+ print("BBBB", p1, p2, p3, r)
+ return os.path.join(r, p3)
+
+
+def update_refs(d, baseuri, urlexpander, merged_map, set_block_style):
+ if isinstance(d, CommentedSeq):
+ if set_block_style:
+ d.fa.set_block_style()
+ for s in d:
+ update_refs(s, baseuri, urlexpander, merged_map, set_block_style)
+ elif isinstance(d, CommentedMap):
+ if set_block_style:
+ d.fa.set_block_style()
+
+ if "id" in d:
+ baseuri = urlexpander(d["id"], baseuri, scoped_id=True)
+
+ for s in d:
+ for field in ("$include", "$import", "location", "run"):
+ if field in d and isinstance(d[field], str):
+ d[field] = rel_ref(d[field], baseuri, urlexpander, merged_map)
+
+ if "$schemas" in d:
+ for n, s in enumerate(d["$schemas"]):
+ d["$schemas"][n] = rel_ref(d["$schemas"][n], baseuri, urlexpander, merged_map)
+
+ update_refs(d[s], baseuri, urlexpander, merged_map, set_block_style)
+
+def new_upload_workflow(arvRunner, tool, job_order, project_uuid,
+ runtimeContext, uuid=None,
+ submit_runner_ram=0, name=None, merged_map=None,
+ submit_runner_image=None,
+ git_info=None):
+
+ firstfile = None
+ workflow_files = set()
+ import_files = set()
+ include_files = set()
+
+ for w in tool.doc_loader.idx:
+ if w.startswith("file://"):
+ workflow_files.add(urllib.parse.urldefrag(w)[0])
+ if firstfile is None:
+ firstfile = urllib.parse.urldefrag(w)[0]
+ if w.startswith("import:file://"):
+ import_files.add(urllib.parse.urldefrag(w[7:])[0])
+ if w.startswith("include:file://"):
+ include_files.add(urllib.parse.urldefrag(w[8:])[0])
+
+ all_files = workflow_files | import_files | include_files
+
+ n = 7
+ allmatch = True
+ while allmatch:
+ n += 1
+ for f in all_files:
+ if len(f)-1 < n:
+ n -= 1
+ allmatch = False
+ break
+ if f[n] != firstfile[n]:
+ allmatch = False
+ break
+
+ while firstfile[n] != "/":
+ n -= 1
+
+ prefix = firstfile[:n+1]
+
+ col = arvados.collection.Collection()
+
+ #print(merged_map.keys())
+
+ for w in workflow_files | import_files:
+ # 1. load YAML
+
+ text = tool.doc_loader.fetch_text(w)
+ if isinstance(text, bytes):
+ textIO = StringIO(text.decode('utf-8'))
+ else:
+ textIO = StringIO(text)
+
+ yamlloader = schema_salad.utils.yaml_no_ts()
+ result = yamlloader.load(textIO)
+
+ set_block_style = False
+ if result.fa.flow_style():
+ set_block_style = True
+
+ # 2. find $import, $include, $schema, run, location
+ # 3. update field value
+ update_refs(result, w, tool.doc_loader.expand_url, merged_map, set_block_style)
+
+ with col.open(w[n+1:], "wt") as f:
+ yamlloader.dump(result, stream=f)
+
+ for w in include_files:
+ with col.open(w[n+1:], "wb") as f1:
+ with open(uri_file_path(w), "rb") as f2:
+ dat = f2.read(65536)
+ while dat:
+ f1.write(dat)
+ dat = f2.read(65536)
+
+ toolname = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ if git_info and git_info.get("http://arvados.org/cwl#gitDescribe"):
+ toolname = "%s (%s)" % (toolname, git_info.get("http://arvados.org/cwl#gitDescribe"))
+
+ col.save_new(name=toolname, owner_uuid=arvRunner.project_uuid, ensure_unique_name=True)
+
+ return col.manifest_locator()
+
+
def upload_workflow(arvRunner, tool, job_order, project_uuid,
runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
upload_dependencies(arvRunner, name, tool.doc_loader,
- packed, tool.tool["id"], False,
+ packed, tool.tool["id"],
runtimeContext)
wf_runner_resources = None
self.doc_loader,
joborder,
joborder.get("id", "#"),
- False,
runtimeContext)
if self.wf_pdh is None:
self.doc_loader,
packed,
self.tool["id"],
- False,
runtimeContext)
# Discover files/directories referenced by the
from .arvcontainer import RunnerContainer, cleanup_name_for_collection
from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
-from .arvworkflow import ArvadosWorkflow, upload_workflow
+from .arvworkflow import ArvadosWorkflow, upload_workflow, new_upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
from .perf import Perf
from .pathmapper import NoFollowPathMapper
loadingContext = self.loadingContext.copy()
loadingContext.do_validate = False
loadingContext.disable_js_validation = True
- if submitting and not self.fast_submit:
- loadingContext.do_update = False
- # Document may have been auto-updated. Reload the original
- # document with updating disabled because we want to
- # submit the document with its original CWL version, not
- # the auto-updated one.
- with Perf(metrics, "load_tool original"):
- tool = load_tool(updated_tool.tool["id"], loadingContext)
- else:
- tool = updated_tool
+ # if submitting and not self.fast_submit:
+ # loadingContext.do_update = False
+ # # Document may have been auto-updated. Reload the original
+ # # document with updating disabled because we want to
+ # # submit the document with its original CWL version, not
+ # # the auto-updated one.
+ # with Perf(metrics, "load_tool original"):
+ # tool = load_tool(updated_tool.tool["id"], loadingContext)
+ # else:
+ tool = updated_tool
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
loadingContext.skip_resolve_all = True
- with Perf(metrics, "load_tool"):
- tool = load_tool(tool.tool, loadingContext)
+ #with Perf(metrics, "load_tool"):
+ # tool = load_tool(tool.tool, loadingContext)
if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
- uuid = upload_workflow(self, tool, job_order,
+ uuid = new_upload_workflow(self, tool, job_order,
runtimeContext.project_uuid,
runtimeContext,
uuid=runtimeContext.update_workflow,
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run, runtimeContext,
+ workflowobj, uri, runtimeContext,
include_primary=True, discovered_secondaryfiles=None,
cache=None):
"""Upload the dependencies of the workflowobj document to Keep.
Returns a pathmapper object mapping local paths to keep references. Also
does an in-place update of references in "workflowobj".
- Use scandeps to find $import, $include, $schemas, run, File and Directory
+ Use scandeps to find $schemas, File and Directory
fields that represent external references.
If workflowobj has an "id" field, this will reload the document to ensure
it is scanning the raw document prior to preprocessing.
"""
- loaded = set()
- def loadref(b, u):
- joined = document_loader.fetcher.urljoin(b, u)
- defrg, _ = urllib.parse.urldefrag(joined)
- if defrg not in loaded:
- loaded.add(defrg)
- if cache is not None and defrg in cache:
- return cache[defrg]
- # Use fetch_text to get raw file (before preprocessing).
- text = document_loader.fetch_text(defrg)
- if isinstance(text, bytes):
- textIO = StringIO(text.decode('utf-8'))
- else:
- textIO = StringIO(text)
- yamlloader = YAML(typ='safe', pure=True)
- result = yamlloader.load(textIO)
- if cache is not None:
- cache[defrg] = result
- return result
- else:
- return {}
-
- if loadref_run:
- loadref_fields = set(("$import", "run"))
- else:
- loadref_fields = set(("$import",))
-
scanobj = workflowobj
- if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
- defrg, _ = urllib.parse.urldefrag(workflowobj["id"])
- if cache is not None and defrg not in cache:
- # if we haven't seen this file before, want raw file
- # content (before preprocessing) to ensure that external
- # references like $include haven't already been inlined.
- scanobj = loadref("", workflowobj["id"])
-
metadata = scanobj
- with Perf(metrics, "scandeps include, location"):
+ with Perf(metrics, "scandeps"):
sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin,
+ set(),
+ set(("location",)),
+ None, urljoin=document_loader.fetcher.urljoin,
nestdirs=False)
-
- with Perf(metrics, "scandeps $schemas"):
optional_deps = scandeps(uri, scanobj,
- loadref_fields,
- set(("$schemas",)),
- loadref, urljoin=document_loader.fetcher.urljoin,
- nestdirs=False)
+ set(),
+ set(("$schemas",)),
+ None, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
if sc_result is None:
sc_result = []
jobloader,
job_order,
job_order.get("id", "#"),
- False,
runtimeContext)
if "id" in job_order:
def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- with Perf(metrics, "upload_docker"):
- upload_docker(arvrunner, tool, runtimeContext)
+ # testing only
+ #with Perf(metrics, "upload_docker"):
+ # upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
document_loader,
deptool,
deptool["id"],
- False,
runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles,