2 import arvados.collection
3 from cwltool.process import get_feature, adjustFiles
4 from .arvdocker import arv_docker_get_image
6 from cwltool.errors import WorkflowException
7 from cwltool.process import UnsupportedRequirement
9 logger = logging.getLogger('arvados.cwl-runner')
11 class ArvadosContainer(object):
12 """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
14 def __init__(self, runner):
15 self.arvrunner = runner
18 def update_pipeline_component(self, r):
21 def run(self, dry_run=False, pull_image=True, **kwargs):
23 "command": self.command_line,
24 "owner_uuid": self.arvrunner.project_uuid,
26 "output_path": "/var/spool/cwl",
27 "cwd": "/var/spool/cwl",
31 runtime_constraints = {}
38 for f in self.pathmapper.files():
39 _, p = self.pathmapper.mapper(f)
42 "portable_data_hash": p[6:]
45 if self.generatefiles:
46 raise UnsupportedRequirement("Stdin redirection currently not suppported")
48 vwd = arvados.collection.Collection()
49 container_request["task.vwd"] = {}
50 for t in self.generatefiles:
51 if isinstance(self.generatefiles[t], dict):
52 src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
53 vwd.copy(rest, t, source_collection=src)
55 with vwd.open(t, "w") as f:
56 f.write(self.generatefiles[t])
59 # for t in self.generatefiles:
60 # container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
62 container_request["environment"] = {"TMPDIR": "/tmp"}
64 container_request["environment"].update(self.environment)
67 raise UnsupportedRequirement("Stdin redirection currently not suppported")
70 mounts["stdout"] = {"kind": "file",
71 "path": "/var/spool/cwl/%s" % (self.stdout)}
73 (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
75 docker_req = {"dockerImageId": "arvados/jobs"}
77 container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
80 self.arvrunner.project_uuid)
82 resources = self.builder.resources
83 if resources is not None:
84 runtime_constraints["vcpus"] = resources.get("cores", 1)
85 runtime_constraints["ram"] = resources.get("ram") * 2**20
86 #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
88 container_request["mounts"] = mounts
89 container_request["runtime_constraints"] = runtime_constraints
92 response = self.arvrunner.api.container_requests().create(
93 body=container_request
94 ).execute(num_retries=self.arvrunner.num_retries)
96 self.arvrunner.jobs[response["container_uuid"]] = self
98 logger.info("Container %s (%s) request state is %s", self.name, response["container_uuid"], response["state"])
100 if response["state"] == "Final":
102 except Exception as e:
103 logger.error("Got error %s" % str(e))
104 self.output_callback({}, "permanentFail")
106 def done(self, record):
108 if record["state"] == "Complete":
109 processStatus = "success"
111 processStatus = "permanentFail"
116 outputs = done.done(self, record, "/tmp", "/var/spool/cwl", "/keep")
117 except WorkflowException as e:
118 logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
119 processStatus = "permanentFail"
120 except Exception as e:
121 logger.exception("Got unknown exception while collecting job outputs:")
122 processStatus = "permanentFail"
124 self.output_callback(outputs, processStatus)
126 del self.arvrunner.jobs[record["uuid"]]