Merge branch 'master' of git.curoverse.com:arvados into 13429-cwl-runner-storage...
authorFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 11 Jun 2018 13:50:27 +0000 (15:50 +0200)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 11 Jun 2018 13:50:27 +0000 (15:50 +0200)
Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

1  2 
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/tests/test_submit.py

index a946452b6540301068cfce4ca9e651093ddc8753,5b29ae517e8b15c33781cd70247c36d2ae831b94..4a6ccc0b2a331626f055d75eb1c54ac9d9a45b6a
@@@ -37,7 -37,7 +37,7 @@@ import arvados.commands._util as arv_cm
  
  from .arvcontainer import ArvadosContainer, RunnerContainer
  from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
- from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+ from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
  from .arvtool import ArvadosCommandTool
  from .arvworkflow import ArvadosWorkflow, upload_workflow
  from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
@@@ -76,7 -76,6 +76,6 @@@ class ArvCwlRunner(object)
          self.workflow_eval_lock = threading.Condition(threading.RLock())
          self.final_output = None
          self.final_status = None
-         self.uploaded = {}
          self.num_retries = num_retries
          self.uuid = None
          self.stop_polling = threading.Event()
          finally:
              self.stop_polling.set()
  
-     def get_uploaded(self):
-         return self.uploaded.copy()
-     def add_uploaded(self, src, pair):
-         self.uploaded[src] = pair
      def add_intermediate_output(self, uuid):
          if uuid:
              self.intermediate_output_collections.append(uuid)
                  with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
                      self.check_features(v)
  
 -    def make_output_collection(self, name, tagsString, outputObj):
 +    def make_output_collection(self, name, storage_classes, tagsString, outputObj):
          outputObj = copy.deepcopy(outputObj)
  
          files = []
          with final.open("cwl.output.json", "w") as f:
              json.dump(outputObj, f, sort_keys=True, indent=4, separators=(',',': '))
  
 -        final.save_new(name=name, owner_uuid=self.project_uuid, ensure_unique_name=True)
 +        final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
  
          logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
                      final.api_response()["name"],
          if self.intermediate_output_ttl < 0:
              raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
  
+         if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+             raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
          if not kwargs.get("name"):
              kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
  
          if self.final_output is None:
              raise WorkflowException("Workflow did not return a result.")
  
 +
          if kwargs.get("submit") and isinstance(runnerjob, Runner):
              logger.info("Final output collection %s", runnerjob.final_output)
          else:
                  self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
              if self.output_tags is None:
                  self.output_tags = ""
 -            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
 +
 +            storage_classes = kwargs.get("storage_classes")
 +            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
              self.set_crunch_output()
  
          if kwargs.get("compute_checksum"):
@@@ -709,6 -702,10 +705,10 @@@ def arg_parser():  # type: () -> argpar
                          help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
                          default=None)
  
+     parser.add_argument("--submit-request-uuid", type=str,
+                         default=None,
+                         help="Update and commit supplied container request instead of creating a new one (containers API only).")
      parser.add_argument("--name", type=str,
                          help="Name to use for workflow execution instance.",
                          default=None)
      parser.add_argument("--enable-dev", action="store_true",
                          help="Enable loading and running development versions "
                               "of CWL spec.", default=False)
 +    parser.add_argument('--storage-classes', default="default",
 +                        help="Specify comma separated list of storage classes to be used when saving workflow output to Keep.")
  
      parser.add_argument("--intermediate-output-ttl", type=int, metavar="N",
                          help="If N > 0, intermediate output collections will be trashed N seconds after creation.  Default is 0 (don't trash).",
@@@ -783,11 -778,6 +783,11 @@@ def main(args, stdout, stderr, api_clie
      job_order_object = None
      arvargs = parser.parse_args(args)
  
 +    arvargs.storage_classes = arvargs.storage_classes.strip().split(',')
 +    if len(arvargs.storage_classes) > 1:
 +        logger.error("Multiple storage classes are not supported currently.")
 +        return 1
 +
      if install_sig_handlers:
          arv_cmd.install_signal_handlers()
  
index 1fae1cf91851a96d5f78da0f1ecb502e9400ffad,f8b557f6cbe86bf4b90bc55a3f4941c88560d948..c1e9629fc8cc28e2a25efe1e81c6040e6e102506
@@@ -234,7 -234,6 +234,6 @@@ def stubs(func)
              },
              'secret_mounts': {},
              'state': 'Committed',
-             'owner_uuid': None,
              'command': ['arvados-cwl-runner', '--local', '--api=containers',
                          '--no-log-timestamps', '--disable-validate',
                          '--eval-timeout=20', '--thread-count=4',
@@@ -333,15 -332,6 +332,15 @@@ class TestSubmit(unittest.TestCase)
          self.assertEqual(capture_stdout.getvalue(),
                           stubs.expect_pipeline_uuid + '\n')
  
 +    @stubs
 +    def test_error_when_multiple_storage_classes_specified(self, stubs):
 +        storage_classes = "foo,bar"
 +        exited = arvados_cwl.main(
 +                ["--debug", "--storage-classes", storage_classes,
 +                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
 +                sys.stdin, sys.stderr, api_client=stubs.api)
 +        self.assertEqual(exited, 1)
 +
      @mock.patch("time.sleep")
      @stubs
      def test_submit_on_error(self, stubs, tm):
                      'kind': 'json'
                  }
              }, 'state': 'Committed',
-             'owner_uuid': None,
              'output_path': '/var/spool/cwl',
              'name': 'expect_arvworkflow.cwl#main',
              'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
                      'kind': 'json'
                  }
              }, 'state': 'Committed',
-             'owner_uuid': None,
              'output_path': '/var/spool/cwl',
              'name': 'a test workflow',
              'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
              },
              "name": "secret_wf.cwl",
              "output_path": "/var/spool/cwl",
-             "owner_uuid": None,
              "priority": 500,
              "properties": {},
              "runtime_constraints": {
          self.assertEqual(capture_stdout.getvalue(),
                           stubs.expect_container_request_uuid + '\n')
  
+     @stubs
+     def test_submit_request_uuid(self, stubs):
+         stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy"
+         stubs.api.container_requests().update().execute.return_value = {
+             "uuid": stubs.expect_container_request_uuid,
+             "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+             "state": "Queued"
+         }
+         capture_stdout = cStringIO.StringIO()
+         try:
+             exited = arvados_cwl.main(
+                 ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-request-uuid=zzzzz-xvhdp-yyyyyyyyyyyyyyy",
+                  "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                 capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+             self.assertEqual(exited, 0)
+         except:
+             logging.exception("")
+         stubs.api.container_requests().update.assert_called_with(
+             uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
+         self.assertEqual(capture_stdout.getvalue(),
+                          stubs.expect_container_request_uuid + '\n')
  
  class TestCreateTemplate(unittest.TestCase):
      existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"