5 from cwltool.errors import WorkflowException
6 from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
8 import arvados.collection
10 from .arvdocker import arv_docker_get_image
12 from .runner import Runner
14 logger = logging.getLogger('arvados.cwl-runner')
16 class ArvadosContainer(object):
17 """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
19 def __init__(self, runner):
20 self.arvrunner = runner
24 def update_pipeline_component(self, r):
27 def run(self, dry_run=False, pull_image=True, **kwargs):
29 "command": self.command_line,
30 "owner_uuid": self.arvrunner.project_uuid,
32 "output_path": "/var/spool/cwl",
33 "cwd": "/var/spool/cwl",
37 runtime_constraints = {}
44 for f in self.pathmapper.files():
45 _, p = self.pathmapper.mapper(f)
48 "portable_data_hash": p[6:]
51 if self.generatefiles:
52 raise UnsupportedRequirement("Generate files not supported")
54 vwd = arvados.collection.Collection(api_client=self.arvrunner.api_client)
55 container_request["task.vwd"] = {}
56 for t in self.generatefiles:
57 if isinstance(self.generatefiles[t], dict):
58 src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
59 vwd.copy(rest, t, source_collection=src)
61 with vwd.open(t, "w") as f:
62 f.write(self.generatefiles[t])
65 # for t in self.generatefiles:
66 # container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
68 container_request["environment"] = {"TMPDIR": "/tmp"}
70 container_request["environment"].update(self.environment)
73 raise UnsupportedRequirement("Stdin redirection currently not suppported")
76 mounts["stdout"] = {"kind": "file",
77 "path": "/var/spool/cwl/%s" % (self.stdout)}
79 (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
81 docker_req = {"dockerImageId": "arvados/jobs"}
83 container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
86 self.arvrunner.project_uuid)
88 resources = self.builder.resources
89 if resources is not None:
90 runtime_constraints["vcpus"] = resources.get("cores", 1)
91 runtime_constraints["ram"] = resources.get("ram") * 2**20
92 #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
94 container_request["mounts"] = mounts
95 container_request["runtime_constraints"] = runtime_constraints
98 response = self.arvrunner.api.container_requests().create(
99 body=container_request
100 ).execute(num_retries=self.arvrunner.num_retries)
102 self.arvrunner.processes[response["container_uuid"]] = self
104 logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
106 if response["state"] == "Final":
108 except Exception as e:
109 logger.error("Got error %s" % str(e))
110 self.output_callback({}, "permanentFail")
112 def done(self, record):
114 if record["state"] == "Complete":
115 rcode = record["exit_code"]
116 if self.successCodes and rcode in self.successCodes:
117 processStatus = "success"
118 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
119 processStatus = "temporaryFail"
120 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
121 processStatus = "permanentFail"
123 processStatus = "success"
125 processStatus = "permanentFail"
127 processStatus = "permanentFail"
132 outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
133 except WorkflowException as e:
134 logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
135 processStatus = "permanentFail"
136 except Exception as e:
137 logger.exception("Got unknown exception while collecting job outputs:")
138 processStatus = "permanentFail"
140 self.output_callback(outputs, processStatus)
142 del self.arvrunner.processes[record["uuid"]]
145 class RunnerContainer(Runner):
146 """Submit and manage a container that runs arvados-cwl-runner."""
148 def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
149 """Create an Arvados job specification for this workflow.
151 The returned dict can be used to create a job (i.e., passed as
152 the +body+ argument to jobs().create()), or as a component in
153 a pipeline template or pipeline instance.
156 workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
158 with arvados.collection.Collection(api_client=self.arvrunner.api) as jobobj:
159 with jobobj.open("cwl.input.json", "w") as f:
160 json.dump(self.job_order, f, sort_keys=True, indent=4)
161 jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
163 workflowname = os.path.basename(self.tool.tool["id"])
164 workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
165 workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
166 workflowcollection = workflowcollection[5:workflowcollection.index('/')]
167 jobpath = "/var/lib/cwl/job/cwl.input.json"
169 container_image = arv_docker_get_image(self.arvrunner.api,
170 {"dockerImageId": "arvados/jobs"},
172 self.arvrunner.project_uuid)
175 "command": ["arvados-cwl-runner", "--local", "--api=containers", workflowpath, jobpath],
176 "owner_uuid": self.arvrunner.project_uuid,
178 "output_path": "/var/spool/cwl",
179 "cwd": "/var/spool/cwl",
181 "state": "Committed",
182 "container_image": container_image,
184 "/var/lib/cwl/workflow": {
185 "kind": "collection",
186 "portable_data_hash": "%s" % workflowcollection
189 "kind": "collection",
190 "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
194 "path": "/var/spool/cwl/cwl.output.json"
197 "kind": "collection",
201 "runtime_constraints": {
203 "ram": 1024*1024*256,
208 def run(self, *args, **kwargs):
209 kwargs["keepprefix"] = "keep:"
210 job_spec = self.arvados_job_spec(*args, **kwargs)
211 job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
213 response = self.arvrunner.api.container_requests().create(
215 ).execute(num_retries=self.arvrunner.num_retries)
217 self.uuid = response["uuid"]
218 self.arvrunner.processes[response["container_uuid"]] = self
220 logger.info("Submitted container %s", response["uuid"])
222 if response["state"] in ("Complete", "Failed", "Cancelled"):