Merge master to output-tags branch and resolve conflict
authorJiayong Li <jiayong@math.mit.edu>
Tue, 15 Nov 2016 19:59:55 +0000 (14:59 -0500)
committerJiayong Li <jiayong@math.mit.edu>
Tue, 15 Nov 2016 19:59:55 +0000 (14:59 -0500)
1  2 
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/tests/test_make_output.py

index 6778eb0222a2fb404e0754578d698df91e792df0,92be92d6e0469fba63a1504a5f0c834fb4a9b2b7..b3d47dd8d05e5981ae4f645fa9c968ee7707e747
@@@ -49,7 -49,7 +49,7 @@@ class ArvCwlRunner(object)
  
      """
  
 -    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
 +    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
          self.api = api_client
          self.processes = {}
          self.lock = threading.Lock()
@@@ -64,7 -64,6 +64,7 @@@
          self.pipeline = None
          self.final_output_collection = None
          self.output_name = output_name
 +        self.output_tags = output_tags
          self.project_uuid = None
  
          if keep_client is not None:
              for v in obj:
                  self.check_writable(v)
  
 -    def make_output_collection(self, name, outputObj):
 +    def make_output_collection(self, name, tagsString, outputObj):
          outputObj = copy.deepcopy(outputObj)
  
          files = []
  
          srccollections = {}
          for k,v in generatemapper.items():
+             if k.startswith("_:"):
+                 if v.type == "Directory":
+                     continue
+                 if v.type == "CreateFile":
+                     with final.open(v.target, "wb") as f:
+                         f.write(v.resolved.encode("utf-8"))
+                     continue
+             if not k.startswith("keep:"):
+                 raise Exception("Output source is not in keep or a literal")
              sp = k.split("/")
              srccollection = sp[0][5:]
              if srccollection not in srccollections:
-                 srccollections[srccollection] = arvados.collection.CollectionReader(
-                     srccollection,
-                     api_client=self.api,
-                     keep_client=self.keep_client,
-                     num_retries=self.num_retries)
+                 try:
+                     srccollections[srccollection] = arvados.collection.CollectionReader(
+                         srccollection,
+                         api_client=self.api,
+                         keep_client=self.keep_client,
+                         num_retries=self.num_retries)
+                 except arvados.errors.ArgumentError as e:
+                     logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                     raise
              reader = srccollections[srccollection]
              try:
                  srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
  
          def rewrite(fileobj):
              fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-             for k in ("basename", "size", "listing"):
+             for k in ("basename", "listing", "contents"):
                  if k in fileobj:
                      del fileobj[k]
  
                      final.api_response()["name"],
                      final.manifest_locator())
  
-         self.final_output_collection = final
 +        final_uuid = final.manifest_locator()
 +        tags = tagsString.split(',')
 +        for tag in tags:
 +             self.api.links().create(body={
 +                "head_uuid": final_uuid, "link_class": "tag", "name": tag
 +                }).execute(num_retries=self.num_retries)
 +
+         def finalcollection(fileobj):
+             fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+         adjustDirObjs(outputObj, finalcollection)
+         adjustFileObjs(outputObj, finalcollection)
+         return (outputObj, final)
  
      def set_crunch_output(self):
          if self.work_api == "containers":
                                           self.output_callback,
                                           **kwargs).next()
                  else:
 -                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
 +                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
              else:
 -                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
 +                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
  
          if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
              # Create pipeline for local run
          else:
              if self.output_name is None:
                  self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
 -            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.final_output)
 +            if self.output_tags is None:
 +                self.output_tags = ""
-             self.make_output_collection(self.output_name, self.output_tags, self.final_output)
++            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
              self.set_crunch_output()
  
          if self.final_status != "success":
@@@ -458,7 -468,6 +478,7 @@@ def arg_parser():  # type: () -> argpar
  
      parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
      parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
 +    parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
      parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                          help="Ignore Docker image version when deciding whether to reuse past jobs.",
                          default=False)
@@@ -516,7 -525,7 +536,7 @@@ def main(args, stdout, stderr, api_clie
      try:
          if api_client is None:
              api_client=arvados.api('v1', model=OrderedJsonModel())
 -        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
 +        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
      except Exception as e:
          logger.error(e)
          return 1
index afcc29a21a424f16aaa3f7aad96f1644fd43b308,aa088c5e8a06fa00ec086483b9f628c79687965f..e7cd617baee8d063da303aac89a93f434234c551
@@@ -42,6 -42,7 +42,7 @@@ class ArvadosContainer(object)
                  "kind": "tmp"
              }
          }
+         scheduling_parameters = {}
  
          dirs = set()
          for f in self.pathmapper.files():
  
          partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
          if partition_req:
-             runtime_constraints["partition"] = aslist(partition_req["partition"])
+             scheduling_parameters["partitions"] = aslist(partition_req["partition"])
  
          container_request["mounts"] = mounts
          container_request["runtime_constraints"] = runtime_constraints
          container_request["use_existing"] = kwargs.get("enable_reuse", True)
+         container_request["scheduling_parameters"] = scheduling_parameters
  
          try:
              response = self.arvrunner.api.container_requests().create(
@@@ -189,9 -191,6 +191,9 @@@ class RunnerContainer(Runner)
          if self.output_name:
              command.append("--output-name=" + self.output_name)
  
 +        if self.output_tags:
 +            command.append("--output-tags=" + self.output_tags)
 +
          if self.enable_reuse:
              command.append("--enable-reuse")
          else:
index f48d8bbe11c02a7293586840c144fa8b7558f9da,8a62204f8fb9ec22298abec15529411ace70ed9e..4db23b98a961904675727a13c33bf91cd3aa1f55
@@@ -85,6 -85,8 +85,8 @@@ class ArvadosJob(object)
          with Perf(metrics, "arv_docker_get_image %s" % self.name):
              (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
              if docker_req and kwargs.get("use_container") is not False:
+                 if docker_req.get("dockerOutputDirectory"):
+                     raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
                  runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
              else:
                  runtime_constraints["docker_image"] = arvados_jobs_image(self.arvrunner)
@@@ -239,9 -241,6 +241,9 @@@ class RunnerJob(Runner)
          if self.output_name:
              self.job_order["arv:output_name"] = self.output_name
  
 +        if self.output_tags:
 +            self.job_order["arv:output_tags"] = self.output_tags
 +
          self.job_order["arv:enable_reuse"] = self.enable_reuse
  
          return {
@@@ -309,8 -308,7 +311,8 @@@ class RunnerTemplate(object)
              tool=tool,
              job_order=job_order,
              enable_reuse=enable_reuse,
 -            output_name=None)
 +            output_name=None,
 +            output_tags=None)
  
      def pipeline_component_spec(self):
          """Return a component that Workbench and a-r-p-i will understand.
index 2b5d186843fc8b3162ffaf0f21ad2b07b68dfb86,a1142544f5bf2e16150d56dd4d0b707cfd4db984..5cc447e9a3bad9202d9e77fb53919dcc66b804c8
@@@ -9,9 -9,11 +9,11 @@@ from cStringIO import StringI
  import cwltool.draft2tool
  from cwltool.draft2tool import CommandLineTool
  import cwltool.workflow
- from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
+ from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
  from cwltool.load_tool import fetch_document
  from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
+ from cwltool.utils import aslist
+ from cwltool.builder import substitute
  
  import arvados.collection
  import ruamel.yaml as yaml
@@@ -108,6 -110,9 +110,9 @@@ def upload_docker(arvrunner, tool)
      if isinstance(tool, CommandLineTool):
          (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
          if docker_req:
+             if docker_req.get("dockerOutputDirectory"):
+                 # TODO: can be supported by containers API, but not jobs API.
+                 raise UnsupportedRequirement("Option 'dockerOutputDirectory' of DockerRequirement not supported.")
              arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
      elif isinstance(tool, cwltool.workflow.Workflow):
          for s in tool.steps:
  def upload_instance(arvrunner, name, tool, job_order):
          upload_docker(arvrunner, tool)
  
+         for t in tool.tool["inputs"]:
+             def setSecondary(fileobj):
+                 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
+                     if "secondaryFiles" not in fileobj:
+                         fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
+                 if isinstance(fileobj, list):
+                     for e in fileobj:
+                         setSecondary(e)
+             if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
+                 setSecondary(job_order[shortname(t["id"])])
          workflowmapper = upload_dependencies(arvrunner,
                                               name,
                                               tool.doc_loader,
@@@ -143,7 -161,7 +161,7 @@@ def arvados_jobs_image(arvrunner)
      return img
  
  class Runner(object):
 -    def __init__(self, runner, tool, job_order, enable_reuse, output_name):
 +    def __init__(self, runner, tool, job_order, enable_reuse, output_name, output_tags):
          self.arvrunner = runner
          self.tool = tool
          self.job_order = job_order
          self.uuid = None
          self.final_output = None
          self.output_name = output_name
 +        self.output_tags = output_tags
  
      def update_pipeline_component(self, record):
          pass
index a1cb605bfc83f78605c72b8eb10c47c73e1c95de,3228ad77b3ca9343c0d6ff736b0714c23acd060b..53f379f1a5ac0cc488af5157080e78933d543367
@@@ -27,15 -27,12 +27,15 @@@ class TestMakeOutput(unittest.TestCase)
          readermock = mock.MagicMock()
          reader.return_value = readermock
  
 +        final_uuid = final.manifest_locator()
 +        num_retries = runner.num_retries
 +
          cwlout = StringIO.StringIO()
          openmock = mock.MagicMock()
          final.open.return_value = openmock
          openmock.__enter__.return_value = cwlout
  
-         runner.make_output_collection("Test output", "tag0,tag1,tag2", {
 -        _, runner.final_output_collection = runner.make_output_collection("Test output", {
++        _, runner.final_output_collection = runner.make_output_collection("Test output", "tag0,tag1,tag2", {
              "foo": {
                  "class": "File",
                  "location": "keep:99999999999999999999999999999991+99/foo.txt",
@@@ -45,7 -42,8 +45,8 @@@
              "bar": {
                  "class": "File",
                  "location": "keep:99999999999999999999999999999992+99/bar.txt",
-                 "basename": "baz.txt"
+                 "basename": "baz.txt",
+                 "size": 4
              }
          })
  
          self.assertEqual("""{
      "bar": {
          "class": "File",
-         "location": "baz.txt"
+         "location": "baz.txt",
+         "size": 4
      },
      "foo": {
          "class": "File",
-         "location": "foo.txt"
+         "location": "foo.txt",
+         "size": 3
      }
  }""", cwlout.getvalue())
  
          self.assertIs(final, runner.final_output_collection)
 +        self.assertIs(final_uuid, runner.final_output_collection.manifest_locator())
 +        self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag0"}), mock.call().execute(num_retries=num_retries)])
 +        self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag1"}), mock.call().execute(num_retries=num_retries)])
 +        self.api.links().create.assert_has_calls([mock.call(body={"head_uuid": final_uuid, "link_class": "tag", "name": "tag2"}), mock.call().execute(num_retries=num_retries)])