3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
16 import arvados.collection
17 import ruamel.yaml as yaml
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
22 logger = logging.getLogger('arvados.cwl-runner')
24 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
26 def trim_listing(obj):
27 """Remove 'listing' field from Directory objects that are keep references.
29 When Directory objects represent Keep references, it redundant and
30 potentially very expensive to pass fully enumerated Directory objects
31 between instances of cwl-runner (e.g. a submitting a job, or using the
32 RunInSingleContainer feature), so delete the 'listing' field when it is
36 if obj.get("location", "").startswith("keep:") and "listing" in obj:
38 if obj.get("location", "").startswith("_:"):
41 def upload_dependencies(arvrunner, name, document_loader,
42 workflowobj, uri, loadref_run):
43 """Upload the dependencies of the workflowobj document to Keep.
45 Returns a pathmapper object mapping local paths to keep references. Also
46 does an in-place update of references in "workflowobj".
48 Use scandeps to find $import, $include, $schemas, run, File and Directory
49 fields that represent external references.
51 If workflowobj has an "id" field, this will reload the document to ensure
52 it is scanning the raw document prior to preprocessing.
57 joined = urlparse.urljoin(b, u)
58 defrg, _ = urlparse.urldefrag(joined)
59 if defrg not in loaded:
61 # Use fetch_text to get raw file (before preprocessing).
62 text = document_loader.fetch_text(defrg)
63 if isinstance(text, bytes):
64 textIO = StringIO(text.decode('utf-8'))
66 textIO = StringIO(text)
67 return yaml.safe_load(textIO)
72 loadref_fields = set(("$import", "run"))
74 loadref_fields = set(("$import",))
77 if "id" in workflowobj:
78 # Need raw file content (before preprocessing) to ensure
79 # that external references in $include and $mixin are captured.
80 scanobj = loadref("", workflowobj["id"])
82 sc = scandeps(uri, scanobj,
84 set(("$include", "$schemas", "location")),
87 normalizeFilesDirs(sc)
89 if "id" in workflowobj:
90 sc.append({"class": "File", "location": workflowobj["id"]})
92 mapper = ArvPathMapper(arvrunner, sc, "",
98 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
99 p["location"] = mapper.mapper(p["location"]).resolved
100 adjustFileObjs(workflowobj, setloc)
101 adjustDirObjs(workflowobj, setloc)
106 def upload_docker(arvrunner, tool):
107 if isinstance(tool, CommandLineTool):
108 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
110 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
111 elif isinstance(tool, cwltool.workflow.Workflow):
113 upload_docker(arvrunner, s.embedded_tool)
115 def upload_instance(arvrunner, name, tool, job_order):
116 upload_docker(arvrunner, tool)
118 workflowmapper = upload_dependencies(arvrunner,
125 jobmapper = upload_dependencies(arvrunner,
126 os.path.basename(job_order.get("id", "#")),
129 job_order.get("id", "#"),
132 if "id" in job_order:
135 return workflowmapper
138 class Runner(object):
139 def __init__(self, runner, tool, job_order, enable_reuse, output_name):
140 self.arvrunner = runner
142 self.job_order = job_order
144 self.enable_reuse = enable_reuse
146 self.final_output = None
147 self.output_name = output_name
149 def update_pipeline_component(self, record):
152 def arvados_job_spec(self, *args, **kwargs):
153 self.name = os.path.basename(self.tool.tool["id"])
154 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
155 adjustDirObjs(self.job_order, trim_listing)
156 return workflowmapper
158 def done(self, record):
159 if record["state"] == "Complete":
160 if record.get("exit_code") is not None:
161 if record["exit_code"] == 33:
162 processStatus = "UnsupportedRequirement"
163 elif record["exit_code"] == 0:
164 processStatus = "success"
166 processStatus = "permanentFail"
168 processStatus = "success"
170 processStatus = "permanentFail"
175 self.final_output = record["output"]
176 outc = arvados.collection.CollectionReader(self.final_output,
177 api_client=self.arvrunner.api,
178 keep_client=self.arvrunner.keep_client,
179 num_retries=self.arvrunner.num_retries)
180 with outc.open("cwl.output.json") as f:
181 outputs = json.load(f)
182 def keepify(fileobj):
183 path = fileobj["location"]
184 if not path.startswith("keep:"):
185 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
186 adjustFileObjs(outputs, keepify)
187 adjustDirObjs(outputs, keepify)
188 except Exception as e:
189 logger.error("While getting final output object: %s", e)
190 self.arvrunner.output_callback(outputs, processStatus)
192 del self.arvrunner.processes[record["uuid"]]