5 from cwltool.errors import WorkflowException
6 from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
8 import arvados.collection
10 from .arvdocker import arv_docker_get_image
12 from .runner import Runner
14 logger = logging.getLogger('arvados.cwl-runner')
16 class ArvadosContainer(object):
17 """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
19 def __init__(self, runner):
20 self.arvrunner = runner
23 def update_pipeline_component(self, r):
26 def run(self, dry_run=False, pull_image=True, **kwargs):
28 "command": self.command_line,
29 "owner_uuid": self.arvrunner.project_uuid,
31 "output_path": "/var/spool/cwl",
32 "cwd": "/var/spool/cwl",
36 runtime_constraints = {}
43 for f in self.pathmapper.files():
44 _, p = self.pathmapper.mapper(f)
47 "portable_data_hash": p[6:]
50 if self.generatefiles:
51 raise UnsupportedRequirement("Generate files not supported")
53 vwd = arvados.collection.Collection(api_client=self.arvrunner.api_client)
54 container_request["task.vwd"] = {}
55 for t in self.generatefiles:
56 if isinstance(self.generatefiles[t], dict):
57 src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
58 vwd.copy(rest, t, source_collection=src)
60 with vwd.open(t, "w") as f:
61 f.write(self.generatefiles[t])
64 # for t in self.generatefiles:
65 # container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
67 container_request["environment"] = {"TMPDIR": "/tmp"}
69 container_request["environment"].update(self.environment)
72 raise UnsupportedRequirement("Stdin redirection currently not suppported")
75 mounts["stdout"] = {"kind": "file",
76 "path": "/var/spool/cwl/%s" % (self.stdout)}
78 (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
80 docker_req = {"dockerImageId": "arvados/jobs"}
82 container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
85 self.arvrunner.project_uuid)
87 resources = self.builder.resources
88 if resources is not None:
89 runtime_constraints["vcpus"] = resources.get("cores", 1)
90 runtime_constraints["ram"] = resources.get("ram") * 2**20
91 #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
93 container_request["mounts"] = mounts
94 container_request["runtime_constraints"] = runtime_constraints
97 response = self.arvrunner.api.container_requests().create(
98 body=container_request
99 ).execute(num_retries=self.arvrunner.num_retries)
101 self.arvrunner.processes[response["container_uuid"]] = self
103 logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
105 if response["state"] == "Final":
107 except Exception as e:
108 logger.error("Got error %s" % str(e))
109 self.output_callback({}, "permanentFail")
111 def done(self, record):
113 if record["state"] == "Complete":
114 rcode = record["exit_code"]
115 if self.successCodes and rcode in self.successCodes:
116 processStatus = "success"
117 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
118 processStatus = "temporaryFail"
119 elif self.permanentFailCodes and rcode in self.permanentFailCodes:
120 processStatus = "permanentFail"
122 processStatus = "success"
124 processStatus = "permanentFail"
126 processStatus = "permanentFail"
131 outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
132 except WorkflowException as e:
133 logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
134 processStatus = "permanentFail"
135 except Exception as e:
136 logger.exception("Got unknown exception while collecting job outputs:")
137 processStatus = "permanentFail"
139 self.output_callback(outputs, processStatus)
141 del self.arvrunner.processes[record["uuid"]]
144 class RunnerContainer(Runner):
145 """Submit and manage a container that runs arvados-cwl-runner."""
147 def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
148 """Create an Arvados job specification for this workflow.
150 The returned dict can be used to create a job (i.e., passed as
151 the +body+ argument to jobs().create()), or as a component in
152 a pipeline template or pipeline instance.
155 workflowmapper = super(RunnerContainer, self).arvados_job_spec(dry_run=dry_run, pull_image=pull_image, **kwargs)
157 with arvados.collection.Collection(api_client=self.arvrunner.api) as jobobj:
158 with jobobj.open("cwl.input.json", "w") as f:
159 json.dump(self.job_order, f, sort_keys=True, indent=4)
160 jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
162 workflowname = os.path.basename(self.tool.tool["id"])
163 workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
164 workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
165 workflowcollection = workflowcollection[5:workflowcollection.index('/')]
166 jobpath = "/var/lib/cwl/job/cwl.input.json"
168 container_image = arv_docker_get_image(self.arvrunner.api,
169 {"dockerImageId": "arvados/jobs"},
171 self.arvrunner.project_uuid)
174 "command": ["arvados-cwl-runner", "--local", "--api=containers", workflowpath, jobpath],
175 "owner_uuid": self.arvrunner.project_uuid,
177 "output_path": "/var/spool/cwl",
178 "cwd": "/var/spool/cwl",
180 "state": "Committed",
181 "container_image": container_image,
183 "/var/lib/cwl/workflow": {
184 "kind": "collection",
185 "portable_data_hash": "%s" % workflowcollection
188 "kind": "collection",
189 "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
193 "path": "/var/spool/cwl/cwl.output.json"
196 "kind": "collection",
200 "runtime_constraints": {
202 "ram": 1024*1024*256,
207 def run(self, *args, **kwargs):
208 kwargs["keepprefix"] = "keep:"
209 job_spec = self.arvados_job_spec(*args, **kwargs)
210 job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
212 response = self.arvrunner.api.container_requests().create(
214 ).execute(num_retries=self.arvrunner.num_retries)
216 self.uuid = response["uuid"]
217 self.arvrunner.processes[response["container_uuid"]] = self
219 logger.info("Submitted container %s", response["uuid"])
221 if response["state"] in ("Complete", "Failed", "Cancelled"):