3 from functools import partial
7 from cStringIO import StringIO
9 import cwltool.draft2tool
10 from cwltool.draft2tool import CommandLineTool
11 import cwltool.workflow
12 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
13 from cwltool.load_tool import fetch_document
14 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
16 import arvados.collection
17 import ruamel.yaml as yaml
19 from .arvdocker import arv_docker_get_image
20 from .pathmapper import ArvPathMapper
21 from ._version import __version__
23 logger = logging.getLogger('arvados.cwl-runner')
25 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
27 def trim_listing(obj):
28 """Remove 'listing' field from Directory objects that are keep references.
30 When Directory objects represent Keep references, it redundant and
31 potentially very expensive to pass fully enumerated Directory objects
32 between instances of cwl-runner (e.g. a submitting a job, or using the
33 RunInSingleContainer feature), so delete the 'listing' field when it is
37 if obj.get("location", "").startswith("keep:") and "listing" in obj:
39 if obj.get("location", "").startswith("_:"):
42 def upload_dependencies(arvrunner, name, document_loader,
43 workflowobj, uri, loadref_run):
44 """Upload the dependencies of the workflowobj document to Keep.
46 Returns a pathmapper object mapping local paths to keep references. Also
47 does an in-place update of references in "workflowobj".
49 Use scandeps to find $import, $include, $schemas, run, File and Directory
50 fields that represent external references.
52 If workflowobj has an "id" field, this will reload the document to ensure
53 it is scanning the raw document prior to preprocessing.
58 joined = urlparse.urljoin(b, u)
59 defrg, _ = urlparse.urldefrag(joined)
60 if defrg not in loaded:
62 # Use fetch_text to get raw file (before preprocessing).
63 text = document_loader.fetch_text(defrg)
64 if isinstance(text, bytes):
65 textIO = StringIO(text.decode('utf-8'))
67 textIO = StringIO(text)
68 return yaml.safe_load(textIO)
73 loadref_fields = set(("$import", "run"))
75 loadref_fields = set(("$import",))
78 if "id" in workflowobj:
79 # Need raw file content (before preprocessing) to ensure
80 # that external references in $include and $mixin are captured.
81 scanobj = loadref("", workflowobj["id"])
83 sc = scandeps(uri, scanobj,
85 set(("$include", "$schemas", "location")),
88 normalizeFilesDirs(sc)
90 if "id" in workflowobj:
91 sc.append({"class": "File", "location": workflowobj["id"]})
93 mapper = ArvPathMapper(arvrunner, sc, "",
99 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
100 p["location"] = mapper.mapper(p["location"]).resolved
101 adjustFileObjs(workflowobj, setloc)
102 adjustDirObjs(workflowobj, setloc)
107 def upload_docker(arvrunner, tool):
108 if isinstance(tool, CommandLineTool):
109 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
111 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
112 elif isinstance(tool, cwltool.workflow.Workflow):
114 upload_docker(arvrunner, s.embedded_tool)
116 def upload_instance(arvrunner, name, tool, job_order):
117 upload_docker(arvrunner, tool)
119 workflowmapper = upload_dependencies(arvrunner,
125 jobmapper = upload_dependencies(arvrunner,
126 os.path.basename(job_order.get("id", "#")),
129 job_order.get("id", "#"),
132 if "id" in job_order:
135 return workflowmapper
137 def arvados_jobs_image(arvrunner):
138 img = "arvados/jobs:"+__version__
140 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
141 except Exception as e:
142 raise Exception("Docker image %s is not available\n%s" % (img, e) )
145 class Runner(object):
146 def __init__(self, runner, tool, job_order, enable_reuse, output_name):
147 self.arvrunner = runner
149 self.job_order = job_order
151 self.enable_reuse = enable_reuse
153 self.final_output = None
154 self.output_name = output_name
156 def update_pipeline_component(self, record):
159 def arvados_job_spec(self, *args, **kwargs):
160 self.name = os.path.basename(self.tool.tool["id"])
161 workflowmapper = upload_instance(self.arvrunner, self.name, self.tool, self.job_order)
162 adjustDirObjs(self.job_order, trim_listing)
163 return workflowmapper
165 def done(self, record):
166 if record["state"] == "Complete":
167 if record.get("exit_code") is not None:
168 if record["exit_code"] == 33:
169 processStatus = "UnsupportedRequirement"
170 elif record["exit_code"] == 0:
171 processStatus = "success"
173 processStatus = "permanentFail"
175 processStatus = "success"
177 processStatus = "permanentFail"
182 self.final_output = record["output"]
183 outc = arvados.collection.CollectionReader(self.final_output,
184 api_client=self.arvrunner.api,
185 keep_client=self.arvrunner.keep_client,
186 num_retries=self.arvrunner.num_retries)
187 with outc.open("cwl.output.json") as f:
188 outputs = json.load(f)
189 def keepify(fileobj):
190 path = fileobj["location"]
191 if not path.startswith("keep:"):
192 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
193 adjustFileObjs(outputs, keepify)
194 adjustDirObjs(outputs, keepify)
195 except Exception as e:
196 logger.error("While getting final output object: %s", e)
197 self.arvrunner.output_callback(outputs, processStatus)
199 del self.arvrunner.processes[record["uuid"]]