Merge branch 'master' into 7478-anm-spot-instances
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 31 May 2018 19:24:07 +0000 (16:24 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 31 May 2018 19:24:07 +0000 (16:24 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

19 files changed:
sdk/R/README.Rmd
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/http.py [new file with mode: 0644]
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_version.py
sdk/cwl/setup.py
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_http.py [new file with mode: 0644]
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_submit.py
services/keep-balance/balance.go
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/computenode/__init__.py
services/nodemanager/tests/test_computenode.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_failure.py

index c2fe07859330ca1db4724ef6683fe7707e851a6f..dcfa2186e9edba13493919c6e4eb192efa03c544 100644 (file)
@@ -1,4 +1,4 @@
-[comment]: # (Copyright © The Arvados Authors. All rights reserved.)
+[comment]: # (Copyright (c) The Arvados Authors. All rights reserved.)
 [comment]: # ()
 [comment]: # (SPDX-License-Identifier: CC-BY-SA-3.0)
 
index d509f400f1058396f2fc91e6ef320a2bbebe92e1..5c60f7d2a019dee14b7fc5aa0a6965e4ce9ac085 100644 (file)
@@ -37,7 +37,7 @@ import arvados.commands._util as arv_cmd
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
 from .arvtool import ArvadosCommandTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
@@ -401,6 +401,9 @@ class ArvCwlRunner(object):
         if self.intermediate_output_ttl < 0:
             raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
 
+        if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+            raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
         if not kwargs.get("name"):
             kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
 
@@ -706,6 +709,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
                         default=None)
 
+    parser.add_argument("--submit-request-uuid", type=str,
+                        default=None,
+                        help="Update and commit supplied container request instead of creating a new one (containers API only).")
+
     parser.add_argument("--name", type=str,
                         help="Name to use for workflow execution instance.",
                         default=None)
index 4e7811d2e8f5b0b477b82334b79385618c3456b9..0bec692643ad805c02d6b8358fae8a65841c1367 100644 (file)
@@ -51,7 +51,6 @@ class ArvadosContainer(object):
 
         container_request = {
             "command": self.command_line,
-            "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
             "output_path": self.outdir,
             "cwd": self.outdir,
@@ -61,6 +60,9 @@ class ArvadosContainer(object):
         }
         runtime_constraints = {}
 
+        if self.arvrunner.project_uuid:
+            container_request["owner_uuid"] = self.arvrunner.project_uuid
+
         if self.arvrunner.secret_store.has_secret(self.command_line):
             raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
 
@@ -251,9 +253,15 @@ class ArvadosContainer(object):
         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
 
         try:
-            response = self.arvrunner.api.container_requests().create(
-                body=container_request
-            ).execute(num_retries=self.arvrunner.num_retries)
+            if kwargs.get("submit_request_uuid"):
+                response = self.arvrunner.api.container_requests().update(
+                    uuid=kwargs["submit_request_uuid"],
+                    body=container_request
+                ).execute(num_retries=self.arvrunner.num_retries)
+            else:
+                response = self.arvrunner.api.container_requests().create(
+                    body=container_request
+                ).execute(num_retries=self.arvrunner.num_retries)
 
             self.uuid = response["uuid"]
             self.arvrunner.process_submitted(self)
@@ -343,7 +351,6 @@ class RunnerContainer(Runner):
                 self.job_order[param] = {"$include": mnt}
 
         container_req = {
-            "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
             "output_path": "/var/spool/cwl",
             "cwd": "/var/spool/cwl",
@@ -442,11 +449,18 @@ class RunnerContainer(Runner):
     def run(self, **kwargs):
         kwargs["keepprefix"] = "keep:"
         job_spec = self.arvados_job_spec(**kwargs)
-        job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+        if self.arvrunner.project_uuid:
+            job_spec["owner_uuid"] = self.arvrunner.project_uuid
 
-        response = self.arvrunner.api.container_requests().create(
-            body=job_spec
-        ).execute(num_retries=self.arvrunner.num_retries)
+        if kwargs.get("submit_request_uuid"):
+            response = self.arvrunner.api.container_requests().update(
+                uuid=kwargs["submit_request_uuid"],
+                body=job_spec
+            ).execute(num_retries=self.arvrunner.num_retries)
+        else:
+            response = self.arvrunner.api.container_requests().create(
+                body=job_spec
+            ).execute(num_retries=self.arvrunner.num_retries)
 
         self.uuid = response["uuid"]
         self.arvrunner.process_submitted(self)
index 6f731fd6877b18fc6bc434bd8110fe2b44775196..f675fb10e80811e92e90b209f073c806c9777afb 100644 (file)
@@ -277,6 +277,7 @@ class ArvadosWorkflow(Workflow):
             })
             kwargs["loader"] = self.doc_loader
             kwargs["avsc_names"] = self.doc_schema
+            kwargs["metadata"]  = self.metadata
             return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
         else:
             return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
index bf940eca4ba92fad7e01ccd06e70564cb2fa0103..5024e95f77df785abf668c68364dadc4d49fb2a4 100644 (file)
@@ -132,6 +132,7 @@ def run():
         args.priority = arvados_cwl.DEFAULT_PRIORITY
         args.do_validate = True
         args.disable_js_validation = False
+        args.tmp_outdir_prefix = "tmp"
 
         runner.arv_executor(t, job_order_object, **vars(args))
     except Exception as e:
diff --git a/sdk/cwl/arvados_cwl/http.py b/sdk/cwl/arvados_cwl/http.py
new file mode 100644 (file)
index 0000000..32fc1cf
--- /dev/null
@@ -0,0 +1,141 @@
+import requests
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urlparse
+import logging
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def my_formatdate(dt):
+    return email.utils.formatdate(timeval=time.mktime(dt.timetuple()),
+                                  localtime=False, usegmt=True)
+
+def my_parsedate(text):
+    parsed = email.utils.parsedate(text)
+    if parsed:
+        return datetime.datetime(*parsed[:6])
+    else:
+        return datetime.datetime(1970, 1, 1)
+
+def fresh_cache(url, properties, now):
+    pr = properties[url]
+    expires = None
+
+    logger.debug("Checking cache freshness for %s using %s", url, pr)
+
+    if "Cache-Control" in pr:
+        if re.match(r"immutable", pr["Cache-Control"]):
+            return True
+
+        g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+        if g:
+            expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+    if expires is None and "Expires" in pr:
+        expires = my_parsedate(pr["Expires"])
+
+    if expires is None:
+        # Use a default cache time of 24 hours if upstream didn't set
+        # any cache headers, to reduce redundant downloads.
+        expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
+
+    if not expires:
+        return False
+
+    return (now < expires)
+
+def remember_headers(url, properties, headers, now):
+    properties.setdefault(url, {})
+    for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
+        if h in headers:
+            properties[url][h] = headers[h]
+    if "Date" not in headers:
+        properties[url]["Date"] = my_formatdate(now)
+
+
+def changed(url, properties, now):
+    req = requests.head(url, allow_redirects=True)
+    remember_headers(url, properties, req.headers, now)
+
+    if req.status_code != 200:
+        raise Exception("Got status %s" % req.status_code)
+
+    pr = properties[url]
+    if "ETag" in pr and "ETag" in req.headers:
+        if pr["ETag"] == req.headers["ETag"]:
+            return False
+
+    return True
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
+    r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+    now = utcnow()
+
+    for item in r["items"]:
+        properties = item["properties"]
+        if fresh_cache(url, properties, now):
+            # Do nothing
+            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+            return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+        if not changed(url, properties, now):
+            # ETag didn't change, same content, just update headers
+            api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+            cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+            return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+    properties = {}
+    req = requests.get(url, stream=True, allow_redirects=True)
+
+    if req.status_code != 200:
+        raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
+
+    remember_headers(url, properties, req.headers, now)
+
+    if "Content-Length" in properties[url]:
+        cl = int(properties[url]["Content-Length"])
+        logger.info("Downloading %s (%s bytes)", url, cl)
+    else:
+        cl = None
+        logger.info("Downloading %s (unknown size)", url)
+
+    c = arvados.collection.Collection()
+
+    if req.headers.get("Content-Disposition"):
+        grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
+        if grp.group(2):
+            name = grp.group(2)
+        else:
+            name = grp.group(4)
+    else:
+        name = urlparse.urlparse(url).path.split("/")[-1]
+
+    count = 0
+    start = time.time()
+    checkpoint = start
+    with c.open(name, "w") as f:
+        for chunk in req.iter_content(chunk_size=1024):
+            count += len(chunk)
+            f.write(chunk)
+            loopnow = time.time()
+            if (loopnow - checkpoint) > 20:
+                bps = (float(count)/float(loopnow - start))
+                if cl is not None:
+                    logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
+                                float(count * 100) / float(cl),
+                                bps/(1024*1024),
+                                (cl-count)/bps)
+                else:
+                    logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024))
+                checkpoint = loopnow
+
+    c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
+
+    api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+    return "keep:%s/%s" % (c.portable_data_hash(), name)
index 6fedb120300b2bdb575a663614840c5ba765b7ec..bd4b5283fbe4ff3de751110e9a15fd5250697008 100644 (file)
@@ -16,6 +16,8 @@ from schema_salad.sourceline import SourceLine
 from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
 from cwltool.workflow import WorkflowException
 
+from .http import http_to_keep
+
 logger = logging.getLogger('arvados.cwl-runner')
 
 def trim_listing(obj):
@@ -81,6 +83,10 @@ class ArvPathMapper(PathMapper):
                     raise WorkflowException("File literal '%s' is missing `contents`" % src)
                 if srcobj["class"] == "Directory" and "listing" not in srcobj:
                     raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+            elif src.startswith("http:") or src.startswith("https:"):
+                keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+                logger.info("%s is %s", src, keepref)
+                self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
             else:
                 self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
 
index a24d53dad6a629f9d08692bb19dd62e144655a7b..13e6d36c073b1db87ef124c8d034a654f27d10ed 100644 (file)
@@ -34,7 +34,7 @@ def get_version(setup_dir, module):
     else:
         try:
             save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
-        except subprocess.CalledProcessError:
+        except (subprocess.CalledProcessError, OSError):
             pass
 
     return read_version(setup_dir, module)
index 4df89ee75583f55a4c7d7cafb5b404dfe2c08467..625f27518d8d5076a23bc2ac3f63263d90fb21d8 100644 (file)
@@ -33,9 +33,9 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20180522135731',
+          'cwltool==1.0.20180524215209',
           'schema-salad==2.7.20180501211602',
-          'typing==3.5.3.0',
+          'typing >= 3.5.3',
           'ruamel.yaml >=0.13.11, <0.15',
           'arvados-python-client>=1.1.4.20180507184611',
           'setuptools',
index 522946a4f49ee2acd68588c6100d45bcb097cbe8..2295e934ac77de76182d04749715a57f730874b4 100644 (file)
@@ -53,7 +53,8 @@ class TestContainer(unittest.TestCase):
             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                     metadata={"cwlVersion": "v1.0"})
             arvtool.formatgraph = None
             for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
                                  make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -139,7 +140,7 @@ class TestContainer(unittest.TestCase):
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                 loader=Loader({}))
+                                                 loader=Loader({}), metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
                              make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -251,7 +252,7 @@ class TestContainer(unittest.TestCase):
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
                                                  avsc_names=avsc_names, make_fs_access=make_fs_access,
-                                                 loader=Loader({}))
+                                                 loader=Loader({}), metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
                              make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -352,7 +353,8 @@ class TestContainer(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
                              make_fs_access=make_fs_access, tmpdir="/tmp"):
@@ -477,7 +479,8 @@ class TestContainer(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         job_order = {
             "p1": {
@@ -584,7 +587,8 @@ class TestContainer(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                      collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
-                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                 basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
 
         job_order = {"pw": "blorp"}
diff --git a/sdk/cwl/tests/test_http.py b/sdk/cwl/tests/test_http.py
new file mode 100644 (file)
index 0000000..0c66c39
--- /dev/null
@@ -0,0 +1,286 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import copy
+import cStringIO
+import functools
+import hashlib
+import json
+import logging
+import mock
+import sys
+import unittest
+import datetime
+
+import arvados
+import arvados.collection
+import arvados_cwl
+import arvados_cwl.runner
+import arvados.keep
+
+from .matcher import JsonDiffMatcher, StripYAMLComments
+from .mock_discovery import get_rootDesc
+
+import arvados_cwl.http
+
+import ruamel.yaml as yaml
+
+
+class TestHttpToKeep(unittest.TestCase):
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.Collection")
+    def test_http_get(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": []
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+        cm.open.assert_called_with("file1.txt", "w")
+        cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+                                       owner_uuid=None, ensure_unique_name=True)
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+        ])
+
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_expires(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 17 May 2018 00:00:00 GMT'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_not_called()
+
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_cache_control(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Cache-Control': 'max-age=172800'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_not_called()
+
+
+    @mock.patch("requests.get")
+    @mock.patch("requests.head")
+    @mock.patch("arvados.collection.Collection")
+    def test_http_expired(self, collectionmock, headmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz4"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999997+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}
+        req.iter_content.return_value = ["def"]
+        getmock.return_value = req
+        headmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
+
+        getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+        cm.open.assert_called_with("file1.txt", "w")
+        cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+                                       owner_uuid=None, ensure_unique_name=True)
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}}}})
+        ])
+
+
+    @mock.patch("requests.get")
+    @mock.patch("requests.head")
+    @mock.patch("arvados.collection.CollectionReader")
+    def test_http_etag(self, collectionmock, headmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": [{
+                "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+                "portable_data_hash": "99999999999999999999999999999998+99",
+                "properties": {
+                    'http://example.com/file1.txt': {
+                        'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+                        'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+                        'ETag': '123456'
+                    }
+                }
+            }]
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        cm.keys.return_value = ["file1.txt"]
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {
+            'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+            'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+            'ETag': '123456'
+        }
+        headmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_not_called()
+        cm.open.assert_not_called()
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {'http://example.com/file1.txt': {
+                          'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+                          'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+                          'ETag': '123456'
+                      }}}})
+                      ])
+
+    @mock.patch("requests.get")
+    @mock.patch("arvados.collection.Collection")
+    def test_http_content_disp(self, collectionmock, getmock):
+        api = mock.MagicMock()
+
+        api.collections().list().execute.return_value = {
+            "items": []
+        }
+
+        cm = mock.MagicMock()
+        cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+        cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+        collectionmock.return_value = cm
+
+        req = mock.MagicMock()
+        req.status_code = 200
+        req.headers = {"Content-Disposition": "attachment; filename=file1.txt"}
+        req.iter_content.return_value = ["abc"]
+        getmock.return_value = req
+
+        utcnow = mock.MagicMock()
+        utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+        r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
+        self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+        getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
+
+        cm.open.assert_called_with("file1.txt", "w")
+        cm.save_new.assert_called_with(name="Downloaded from http://example.com/download?fn=/file1.txt",
+                                       owner_uuid=None, ensure_unique_name=True)
+
+        api.collections().update.assert_has_calls([
+            mock.call(uuid=cm.manifest_locator(),
+                      body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+        ])
index abf96947eb1b40f21207e0fc9171f66482a0bd47..30930dd49abb1a91fe371cc291cd916feac735df 100644 (file)
@@ -59,7 +59,8 @@ class TestJob(unittest.TestCase):
             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+                                                     metadata={"cwlVersion": "v1.0"})
             arvtool.formatgraph = None
             for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
                 j.run(enable_reuse=enable_reuse)
@@ -150,7 +151,8 @@ class TestJob(unittest.TestCase):
         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
                                          collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                 make_fs_access=make_fs_access, loader=Loader({}))
+                                                 make_fs_access=make_fs_access, loader=Loader({}),
+                                                 metadata={"cwlVersion": "v1.0"})
         arvtool.formatgraph = None
         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
             j.run(enable_reuse=True)
index 77bef075fa97903e96bcc18e5f6c2c11dc4f5654..f8b557f6cbe86bf4b90bc55a3f4941c88560d948 100644 (file)
@@ -234,7 +234,6 @@ def stubs(func):
             },
             'secret_mounts': {},
             'state': 'Committed',
-            'owner_uuid': None,
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
@@ -751,7 +750,6 @@ class TestSubmit(unittest.TestCase):
                     'kind': 'json'
                 }
             }, 'state': 'Committed',
-            'owner_uuid': None,
             'output_path': '/var/spool/cwl',
             'name': 'expect_arvworkflow.cwl#main',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
@@ -870,7 +868,6 @@ class TestSubmit(unittest.TestCase):
                     'kind': 'json'
                 }
             }, 'state': 'Committed',
-            'owner_uuid': None,
             'output_path': '/var/spool/cwl',
             'name': 'a test workflow',
             'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
@@ -1236,7 +1233,6 @@ class TestSubmit(unittest.TestCase):
             },
             "name": "secret_wf.cwl",
             "output_path": "/var/spool/cwl",
-            "owner_uuid": None,
             "priority": 500,
             "properties": {},
             "runtime_constraints": {
@@ -1259,6 +1255,31 @@ class TestSubmit(unittest.TestCase):
         self.assertEqual(capture_stdout.getvalue(),
                          stubs.expect_container_request_uuid + '\n')
 
+    @stubs
+    def test_submit_request_uuid(self, stubs):
+        stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy"
+
+        stubs.api.container_requests().update().execute.return_value = {
+            "uuid": stubs.expect_container_request_uuid,
+            "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+            "state": "Queued"
+        }
+
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-request-uuid=zzzzz-xvhdp-yyyyyyyyyyyyyyy",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        stubs.api.container_requests().update.assert_called_with(
+            uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
 
 class TestCreateTemplate(unittest.TestCase):
     existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
index 7b25d78852bde9a575242331897bc26bc8cb092b..d6a2dde9d74005c0e9fca4d87ccbc7dd1ece5243 100644 (file)
@@ -365,27 +365,29 @@ func (bal *Balancer) ComputeChangeSets() {
                blkid arvados.SizedDigest
                blk   *BlockState
        }
-       nWorkers := 1 + runtime.NumCPU()
-       todo := make(chan balanceTask, nWorkers)
-       results := make(chan balanceResult, 16)
-       var wg sync.WaitGroup
-       for i := 0; i < nWorkers; i++ {
-               wg.Add(1)
-               go func() {
-                       for work := range todo {
-                               results <- bal.balanceBlock(work.blkid, work.blk)
+       workers := runtime.GOMAXPROCS(-1)
+       todo := make(chan balanceTask, workers)
+       go func() {
+               bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+                       todo <- balanceTask{
+                               blkid: blkid,
+                               blk:   blk,
                        }
-                       wg.Done()
-               }()
-       }
-       bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
-               todo <- balanceTask{
-                       blkid: blkid,
-                       blk:   blk,
-               }
-       })
-       close(todo)
+               })
+               close(todo)
+       }()
+       results := make(chan balanceResult, workers)
        go func() {
+               var wg sync.WaitGroup
+               for i := 0; i < workers; i++ {
+                       wg.Add(1)
+                       go func() {
+                               for work := range todo {
+                                       results <- bal.balanceBlock(work.blkid, work.blk)
+                               }
+                               wg.Done()
+                       }()
+               }
                wg.Wait()
                close(results)
        }()
index 9d4d8019282ebf01160544d940345b36fe892076..bdab58927bdc243605b8cf1d7e95b34d2f610272 100644 (file)
@@ -429,7 +429,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        case <-ctx.Done():
                theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
                // Our pipe might be stuck in Write(), waiting for
-               // io.Copy() to read. If so, un-stick it. This means
+               // PutReader() to read. If so, un-stick it. This means
                // PutReader will get corrupt data, but that's OK: the
                // size and MD5 won't match, so the write will fail.
                go io.Copy(ioutil.Discard, bufr)
@@ -438,6 +438,8 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
                return ctx.Err()
        case <-ready:
+               // Unblock pipe in case PutReader did not consume it.
+               io.Copy(ioutil.Discard, bufr)
                return v.translateError(err)
        }
 }
index 3c04118abe2ec2bb3f5fee4c1a74e078d61119ed..b124c66540aab804db3ad555b048c9ef5097e8d0 100644 (file)
@@ -33,7 +33,7 @@ def arvados_timestamp(timestr):
         subsecs = float(subsec_match.group(1))
         timestr = timestr[:subsec_match.start()] + 'Z'
     return calendar.timegm(time.strptime(timestr + 'UTC',
-                                         ARVADOS_TIMEFMT + '%Z'))
+                                         ARVADOS_TIMEFMT + '%Z')) + subsecs
 
 def timestamp_fresh(timestamp, fresh_time):
     return (time.time() - timestamp) < fresh_time
index 3f11ff6c2b22d02a47b8f8e9a6bfe246479d10ed..898112bdd8a8c8df86bf80e0a4af1aac3c592ec8 100644 (file)
@@ -37,3 +37,9 @@ class ShutdownTimerTestCase(unittest.TestCase):
         time_mock.return_value += 200
         self.assertEqual(961, timer.next_opening())
         self.assertFalse(timer.window_open())
+
+
+class ArvadosTimestamp(unittest.TestCase):
+    def test_arvados_timestamp(self):
+        self.assertEqual(1527710178, cnode.arvados_timestamp('2018-05-30T19:56:18Z'))
+        self.assertEqual(1527710178.999371, cnode.arvados_timestamp('2018-05-30T19:56:18.999371Z'))
index 8050e6981411d69f127617e0cb2b44681470341d..d09cbf72359610ac08afa428e39f024d3086835c 100644 (file)
@@ -17,11 +17,24 @@ from arvnodeman.jobqueue import ServerCalculator
 from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
 from . import testutil
 from . import test_status
+from . import pykka_timeout
 import logging
 
 class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
 
+    def assertwait(self, f, timeout=pykka_timeout*2):
+        deadline = time.time() + timeout
+        while True:
+            try:
+                return f()
+            except AssertionError:
+                if time.time() > deadline:
+                    raise
+                pass
+            time.sleep(.1)
+            self.daemon.ping().get(self.TIMEOUT)
+
     def busywait(self, f):
         for n in xrange(200):
             ok = f()
@@ -146,8 +159,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.assertIn('node_quota', status.tracker._latest)
 
     def check_monitors_arvados_nodes(self, *arv_nodes):
-        self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
-        self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
+        self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
 
     def test_node_pairing(self):
         cloud_node = testutil.cloud_node_mock(1)
@@ -257,7 +269,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -269,7 +281,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                          arvados_nodes=[testutil.arvados_node_mock(1),
                                         testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
                          want_sizes=[size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         get_cloud_node = mock.MagicMock(name="get_cloud_node")
         get_cloud_node.get.return_value = cloud_nodes[1]
         mock_node_monitor = mock.MagicMock()
@@ -278,7 +290,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
 
         self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
 
-        self.busywait(lambda: 2 == self.alive_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
@@ -298,8 +310,8 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(2, job_uuid=True)
         self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
                          [size], avail_sizes=[(size, {"cores":1})])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
-        self.busywait(lambda: self.node_setup.start.called)
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+        self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
 
     def test_boot_new_node_below_min_nodes(self):
         min_size = testutil.MockSize(1)
@@ -543,7 +555,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_node = testutil.arvados_node_mock(1)
         size = testutil.MockSize(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -553,7 +565,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(1)
         arv_node = testutil.arvados_node_mock(1)
         self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.stop_proxy(self.daemon)
@@ -572,7 +584,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
                      testutil.arvados_node_mock(4, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -591,13 +603,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = False
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
 
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.last_shutdown.success.get.return_value = True
         self.last_shutdown.stop.side_effect = lambda: monitor.stop()
         self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
-        self.busywait(lambda: 0 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
 
     def test_nodes_shutting_down_replaced_below_max_nodes(self):
         size = testutil.MockSize(6)
@@ -616,7 +628,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_node = testutil.cloud_node_mock(7)
         self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
                          max_nodes=1)
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         monitor = self.monitor_list()[0].proxy()
         self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
         self.assertTrue(self.node_shutdown.start.called)
@@ -630,7 +642,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
         self.make_daemon(cloud_nodes, arv_nodes, [size],
                          avail_sizes=[(size, {"cores":1})])
-        self.busywait(lambda: 2 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
         self.assertEqual(1, self.node_shutdown.start.call_count)
@@ -671,7 +683,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
         cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
         arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
         self.make_daemon(cloud_nodes, arv_nodes, [size])
-        self.busywait(lambda: 1 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
         for mon_ref in self.monitor_list():
             monitor = mon_ref.proxy()
             if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
@@ -770,7 +782,7 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                         testutil.arvados_node_mock(3)],
                          want_sizes=[small, small, big],
                          avail_sizes=avail_sizes)
-        self.busywait(lambda: 3 == self.paired_monitor_count())
+        self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
         self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
 
         self.assertEqual(0, self.node_shutdown.start.call_count)
index 2d1a17eaecd82a7cea0be0103d27fc2e8f07c4c5..8bf3ea87412200a595d70d02c34796f75a2a8543 100644 (file)
@@ -48,9 +48,12 @@ class ActorUnhandledExceptionTest(testutil.ActorTestMixin, unittest.TestCase):
     def test_nonfatal_error(self):
         status.tracker.update({'actor_exceptions': 0})
         kill_mock = mock.Mock('os.kill')
-        act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
+        bgact = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock)
+        act_thread = bgact.proxy().get_thread().get()
+        act = bgact.tell_proxy()
         act.doStuff()
         act.actor_ref.stop(block=True)
+        act_thread.join()
         self.assertFalse(kill_mock.called)
         self.assertEqual(1, status.tracker.get('actor_exceptions'))