Merge branch '13006-api-is_a-filter' into 13006-sync-groups-is_a-filter
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Fri, 14 Dec 2018 15:54:49 +0000 (12:54 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Fri, 14 Dec 2018 15:54:49 +0000 (12:54 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

12 files changed:
build/package-build-dockerfiles/Makefile
lib/dispatchcloud/node_size.go
lib/dispatchcloud/node_size_test.go
sdk/python/arvados/commands/put.py
sdk/python/arvados/keep.py
sdk/python/tests/test_arv_put.py
services/api/app/models/arvados_model.rb
services/api/app/models/collection.rb
services/api/lib/arvados_model_updates.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
tools/sync-groups/sync-groups.go

index cdd44d9c5752e7fee4fda7423b6f922e1d224fb7..6591319029f131dd81a32665c2bc35fe5ef9a9d8 100644 (file)
@@ -41,7 +41,7 @@ common-generated/$(NODETARBALL): common-generated
        wget -cqO common-generated/$(NODETARBALL) https://nodejs.org/dist/v6.11.2/$(NODETARBALL)
 
 common-generated/$(RVMKEY): common-generated
-       wget -cqO common-generated/$(RVMKEY) https://rvm.io/mpapis.asc
+       wget -cqO common-generated/$(RVMKEY) https://rvm.io/pkuczynski.asc
 
 common-generated:
        mkdir common-generated
index 1c36d6cf5bb770cb447b6f7f177d39c5ff7ef469..339e042c1aa0d15cb3f6cf1994c6146bf34de11d 100644 (file)
@@ -8,7 +8,9 @@ import (
        "errors"
        "log"
        "os/exec"
+       "regexp"
        "sort"
+       "strconv"
        "strings"
        "time"
 
@@ -27,6 +29,67 @@ type ConstraintsNotSatisfiableError struct {
        AvailableTypes []arvados.InstanceType
 }
 
+var pdhRegexp = regexp.MustCompile(`^[0-9a-f]{32}\+(\d+)$`)
+
+// estimateDockerImageSize estimates how much disk space will be used
+// by a Docker image, given the PDH of a collection containing a
+// Docker image that was created by "arv-keepdocker".  Returns
+// estimated number of bytes of disk space that should be reserved.
+func estimateDockerImageSize(collectionPDH string) int64 {
+       m := pdhRegexp.FindStringSubmatch(collectionPDH)
+       if m == nil {
+               log.Printf("estimateDockerImageSize: '%v' did not match pdhRegexp, returning 0", collectionPDH)
+               return 0
+       }
+       n, err := strconv.ParseInt(m[1], 10, 64)
+       if err != nil || n < 122 {
+               log.Printf("estimateDockerImageSize: short manifest %v or error (%v), returning 0", n, err)
+               return 0
+       }
+       // To avoid having to fetch the collection, take advantage of
+       // the fact that the manifest storing a container image
+       // uploaded by arv-keepdocker has a predictable format, which
+       // allows us to estimate the size of the image based on just
+       // the size of the manifest.
+       //
+       // Use the following heuristic:
+       // - Start with the length of the mainfest (n)
+       // - Subtract 80 characters for the filename and file segment
+       // - Divide by 42 to get the number of block identifiers ('hash\+size\ ' is 32+1+8+1)
+       // - Assume each block is full, multiply by 64 MiB
+       return ((n - 80) / 42) * (64 * 1024 * 1024)
+}
+
+// EstimateScratchSpace estimates how much available disk space (in
+// bytes) is needed to run the container by summing the capacity
+// requested by 'tmp' mounts plus disk space required to load the
+// Docker image.
+func EstimateScratchSpace(ctr *arvados.Container) (needScratch int64) {
+       for _, m := range ctr.Mounts {
+               if m.Kind == "tmp" {
+                       needScratch += m.Capacity
+               }
+       }
+
+       // Account for disk space usage by Docker, assumes the following behavior:
+       // - Layer tarballs are buffered to disk during "docker load".
+       // - Individual layer tarballs are extracted from buffered
+       // copy to the filesystem
+       dockerImageSize := estimateDockerImageSize(ctr.ContainerImage)
+
+       // The buffer is only needed during image load, so make sure
+       // the baseline scratch space at least covers dockerImageSize,
+       // and assume it will be released to the job afterwards.
+       if needScratch < dockerImageSize {
+               needScratch = dockerImageSize
+       }
+
+       // Now reserve space for the extracted image on disk.
+       needScratch += dockerImageSize
+
+       return
+}
+
 // ChooseInstanceType returns the cheapest available
 // arvados.InstanceType big enough to run ctr.
 func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
@@ -35,12 +98,7 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
                return
        }
 
-       needScratch := int64(0)
-       for _, m := range ctr.Mounts {
-               if m.Kind == "tmp" {
-                       needScratch += m.Capacity
-               }
-       }
+       needScratch := EstimateScratchSpace(ctr)
 
        needVCPUs := ctr.RuntimeConstraints.VCPUs
 
index 91c6bb1049fb381d9070e747b1f076eec2f95dbc..eef86f74775134b4c6b0848d0b2897c5d47bef29 100644 (file)
@@ -119,3 +119,25 @@ func (*NodeSizeSuite) TestChoosePreemptable(c *check.C) {
        c.Check(best.Scratch >= 2*GiB, check.Equals, true)
        c.Check(best.Preemptible, check.Equals, true)
 }
+
+func (*NodeSizeSuite) TestScratchForDockerImage(c *check.C) {
+       n := EstimateScratchSpace(&arvados.Container{
+               ContainerImage: "d5025c0f29f6eef304a7358afa82a822+342",
+       })
+       // Actual image is 371.1 MiB (according to workbench)
+       // Estimated size is 384 MiB (402653184 bytes)
+       // Want to reserve 2x the estimated size, so 805306368 bytes
+       c.Check(n, check.Equals, int64(805306368))
+
+       n = EstimateScratchSpace(&arvados.Container{
+               ContainerImage: "d5025c0f29f6eef304a7358afa82a822+-342",
+       })
+       // Parse error will return 0
+       c.Check(n, check.Equals, int64(0))
+
+       n = EstimateScratchSpace(&arvados.Container{
+               ContainerImage: "d5025c0f29f6eef304a7358afa82a822+34",
+       })
+       // Short manifest will return 0
+       c.Check(n, check.Equals, int64(0))
+}
index cba00c3c8cf153039de990d27867558d0dbc699a..61258632bdd94f7acc684eaab50c442046f29f2e 100644 (file)
@@ -31,6 +31,7 @@ import traceback
 
 from apiclient import errors as apiclient_errors
 from arvados._version import __version__
+from arvados.util import keep_locator_pattern
 
 import arvados.commands._util as arv_cmd
 
@@ -289,6 +290,9 @@ class ResumeCacheConflict(Exception):
     pass
 
 
+class ResumeCacheInvalidError(Exception):
+    pass
+
 class ArvPutArgumentConflict(Exception):
     pass
 
@@ -387,7 +391,7 @@ class ResumeCache(object):
             new_cache = os.fdopen(new_cache_fd, 'r+')
             json.dump(data, new_cache)
             os.rename(new_cache_name, self.filename)
-        except (IOError, OSError, ResumeCacheConflict) as error:
+        except (IOError, OSError, ResumeCacheConflict):
             try:
                 os.unlink(new_cache_name)
             except NameError:  # mkstemp failed.
@@ -482,8 +486,8 @@ class ArvPutUploadJob(object):
 
     def _build_upload_list(self):
         """
-        Scan the requested paths to count file sizes, excluding files & dirs if requested
-        and building the upload file list.
+        Scan the requested paths to count file sizes, excluding requested files
+        and dirs and building the upload file list.
         """
         # If there aren't special files to be read, reset total bytes count to zero
         # to start counting.
@@ -795,6 +799,20 @@ class ArvPutUploadJob(object):
     def _my_collection(self):
         return self._remote_collection if self.update else self._local_collection
 
+    def _get_cache_filepath(self):
+        # Set up cache file name from input paths.
+        md5 = hashlib.md5()
+        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
+        realpaths = sorted(os.path.realpath(path) for path in self.paths)
+        md5.update(b'\0'.join([p.encode() for p in realpaths]))
+        if self.filename:
+            md5.update(self.filename.encode())
+        cache_filename = md5.hexdigest()
+        cache_filepath = os.path.join(
+            arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+            cache_filename)
+        return cache_filepath
+
     def _setup_state(self, update_collection):
         """
         Create a new cache file or load a previously existing one.
@@ -814,17 +832,7 @@ class ArvPutUploadJob(object):
             raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
 
         if self.use_cache:
-            # Set up cache file name from input paths.
-            md5 = hashlib.md5()
-            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
-            realpaths = sorted(os.path.realpath(path) for path in self.paths)
-            md5.update(b'\0'.join([p.encode() for p in realpaths]))
-            if self.filename:
-                md5.update(self.filename.encode())
-            cache_filename = md5.hexdigest()
-            cache_filepath = os.path.join(
-                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
-                cache_filename)
+            cache_filepath = self._get_cache_filepath()
             if self.resume and os.path.exists(cache_filepath):
                 self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
                 self._cache_file = open(cache_filepath, 'a+')
@@ -850,6 +858,8 @@ class ArvPutUploadJob(object):
                 self.logger.info("No cache usage requested for this run.")
                 # No cache file, set empty state
                 self._state = copy.deepcopy(self.EMPTY_STATE)
+            if not self._cached_manifest_valid():
+                raise ResumeCacheInvalidError()
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(
                 self._state['manifest'],
@@ -857,6 +867,48 @@ class ArvPutUploadJob(object):
                 put_threads=self.put_threads,
                 api_client=self._api_client)
 
+    def _cached_manifest_valid(self):
+        """
+        Validate the oldest non-expired block signature to check if cached manifest
+        is usable: checking if the cached manifest was not created with a different
+        arvados account.
+        """
+        if self._state.get('manifest', None) is None:
+            # No cached manifest yet, all good.
+            return True
+        now = datetime.datetime.utcnow()
+        oldest_exp = None
+        oldest_loc = None
+        block_found = False
+        for m in keep_locator_pattern.finditer(self._state['manifest']):
+            loc = m.group(0)
+            try:
+                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
+            except IndexError:
+                # Locator without signature
+                continue
+            block_found = True
+            if exp > now and (oldest_exp is None or exp < oldest_exp):
+                oldest_exp = exp
+                oldest_loc = loc
+        if not block_found:
+            # No block signatures found => no invalid block signatures.
+            return True
+        if oldest_loc is None:
+            # Locator signatures found, but all have expired.
+            # Reset the cache and move on.
+            self.logger.info('Cache expired, starting from scratch.')
+            self._state['manifest'] = ''
+            return True
+        kc = arvados.KeepClient(api_client=self._api_client,
+                                num_retries=self.num_retries)
+        try:
+            kc.head(oldest_loc)
+        except arvados.errors.KeepRequestError:
+            # Something is wrong, cached manifest is not valid.
+            return False
+        return True
+
     def collection_file_paths(self, col, path_prefix='.'):
         """Return a list of file paths by recursively go through the entire collection `col`"""
         file_paths = []
@@ -1131,6 +1183,14 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             "arv-put: Another process is already uploading this data.",
             "         Use --no-cache if this is really what you want."]))
         sys.exit(1)
+    except ResumeCacheInvalidError:
+        logger.error("\n".join([
+            "arv-put: Resume cache contains invalid signature: it may have expired",
+            "         or been created with another Arvados user's credentials.",
+            "         Switch user or use one of the following options to restart upload:",
+            "         --no-resume to start a new resume cache.",
+            "         --no-cache to disable resume cache."]))
+        sys.exit(1)
     except CollectionUpdateError as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
index 1b6376e9be1035dac69bb34da974c2c486477627..4354ced67d3b4a3b678be3cacaa575f58f4f6d3f 100644 (file)
@@ -791,6 +791,7 @@ class KeepClient(object):
 
         if local_store:
             self.local_store = local_store
+            self.head = self.local_store_head
             self.get = self.local_store_get
             self.put = self.local_store_put
         else:
@@ -1230,5 +1231,17 @@ class KeepClient(object):
         with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f:
             return f.read()
 
+    def local_store_head(self, loc_s, num_retries=None):
+        """Companion to local_store_put()."""
+        try:
+            locator = KeepLocator(loc_s)
+        except ValueError:
+            raise arvados.errors.NotFoundError(
+                "Invalid data locator: '%s'" % loc_s)
+        if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]:
+            return True
+        if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
+            return True
+
     def is_cached(self, locator):
         return self.block_cache.reserve_cache(expect_hash)
index 93cfdc2a36c26389a3259222304e7ba1d5de7dff..a41184d10fb4fe7daadeb0892cf60ce36f47e8df 100644 (file)
@@ -8,6 +8,7 @@ from future import standard_library
 standard_library.install_aliases()
 from builtins import str
 from builtins import range
+from functools import partial
 import apiclient
 import datetime
 import hashlib
@@ -528,6 +529,85 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                                     resume=False)
         del(self.writer)
 
+class CachedManifestValidationTest(ArvadosBaseTestCase):
+    class MockedPut(arv_put.ArvPutUploadJob):
+        def __init__(self, cached_manifest=None):
+            self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+            self._state['manifest'] = cached_manifest
+            self._api_client = mock.MagicMock()
+            self.logger = mock.MagicMock()
+            self.num_retries = 1
+
+    def datetime_to_hex(self, dt):
+        return hex(int(time.mktime(dt.timetuple())))[2:]
+
+    def setUp(self):
+        super(CachedManifestValidationTest, self).setUp()
+        self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
+        self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
+        self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
+
+    def test_empty_cached_manifest_is_valid(self):
+        put_mock = self.MockedPut()
+        self.assertEqual(None, put_mock._state.get('manifest'))
+        self.assertTrue(put_mock._cached_manifest_valid())
+        put_mock._state['manifest'] = ''
+        self.assertTrue(put_mock._cached_manifest_valid())
+
+    def test_signature_cases(self):
+        now = datetime.datetime.utcnow()
+        yesterday = now - datetime.timedelta(days=1)
+        lastweek = now - datetime.timedelta(days=7)
+        tomorrow = now + datetime.timedelta(days=1)
+        nextweek = now + datetime.timedelta(days=7)
+
+        def mocked_head(blocks={}, loc=None):
+            blk = loc.split('+', 1)[0]
+            if blocks.get(blk):
+                return True
+            raise arvados.errors.KeepRequestError("mocked error - block invalid")
+
+        # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
+        cases = [
+            # All expired, reset cache - OK
+            (yesterday, lastweek, False, False, True),
+            (lastweek, yesterday, False, False, True),
+            # All non-expired valid blocks - OK
+            (tomorrow, nextweek, True, True, True),
+            (nextweek, tomorrow, True, True, True),
+            # All non-expired invalid blocks - Not OK
+            (tomorrow, nextweek, False, False, False),
+            (nextweek, tomorrow, False, False, False),
+            # One non-expired valid block - OK
+            (tomorrow, yesterday, True, False, True),
+            (yesterday, tomorrow, False, True, True),
+            # One non-expired invalid block - Not OK
+            (tomorrow, yesterday, False, False, False),
+            (yesterday, tomorrow, False, False, False),
+        ]
+        for case in cases:
+            b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
+            head_responses = {
+                self.block1: b1_valid,
+                self.block2: b2_valid,
+            }
+            cached_manifest = self.template % (
+                self.datetime_to_hex(b1_expiration),
+                self.datetime_to_hex(b2_expiration),
+            )
+            arvput = self.MockedPut(cached_manifest)
+            with mock.patch('arvados.collection.KeepClient.head') as head_mock:
+                head_mock.side_effect = partial(mocked_head, head_responses)
+                self.assertEqual(outcome, arvput._cached_manifest_valid(),
+                    "Case '%s' should have produced outcome '%s'" % (case, outcome)
+                )
+                if b1_expiration > now or b2_expiration > now:
+                    # A HEAD request should have been done
+                    head_mock.assert_called_once()
+                else:
+                    head_mock.assert_not_called()
+
+
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)
 
@@ -549,7 +629,7 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
                          writer.bytes_expected)
 
     def test_expected_bytes_for_device(self):
-        writer = arv_put.ArvPutUploadJob(['/dev/null'])
+        writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
         self.assertIsNone(writer.bytes_expected)
         writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
         self.assertIsNone(writer.bytes_expected)
@@ -938,7 +1018,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
-    def test_expired_token_invalidates_cache(self):
+    def test_all_expired_signatures_invalidates_cache(self):
         self.authorize_with('active')
         tmpdir = self.make_tmpdir()
         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
@@ -974,7 +1054,89 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         (out, err) = p.communicate()
         self.assertRegex(
             err.decode(),
-            r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+            r'INFO: Cache expired, starting from scratch.*')
+        self.assertEqual(p.returncode, 0)
+
+    def test_invalid_signature_invalidates_cache(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+            f.write('foo')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an invalid access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        cache['manifest'] = re.sub(
+            r'\+A.*\@',
+            "+Aabcdef0123456789abcdef0123456789abcdef01@",
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'ERROR: arv-put: Resume cache contains invalid signature.*')
+        self.assertEqual(p.returncode, 1)
+
+    def test_single_expired_signature_reuploads_file(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
+            f.write('foo')
+        # Write a second file on its own subdir to force a new stream
+        os.mkdir(os.path.join(tmpdir, 'bar'))
+        with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
+            f.write('bar')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an expired access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+        # Make one of the signatures appear to have expired
+        cache['manifest'] = re.sub(
+            r'\@.*? 3:3:barfile.txt',
+            "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
         self.assertEqual(p.returncode, 0)
         # Confirm that the resulting cache is different from the last run.
         with open(cache_filepath, 'r') as c2:
index 93d5b9a0239753a8820d86b883abcbdf1a06b776..eea95e2be1aad7c7535f48e430d5662d40149e04 100644 (file)
@@ -557,6 +557,8 @@ class ArvadosModel < ActiveRecord::Base
     self.owner_uuid ||= current_default_owner if self.respond_to? :owner_uuid=
     if !anonymous_updater
       self.modified_by_user_uuid = current_user ? current_user.uuid : nil
+    end
+    if !timeless_updater
       self.modified_at = current_time
     end
     self.modified_by_client_uuid = current_api_client ? current_api_client.uuid : nil
index 487043ee3549d8afe915f9abeeaeab2c8f252707..33cc686d4f5a14f3a432bc3df077ab258797a4a8 100644 (file)
@@ -287,7 +287,9 @@ class Collection < ArvadosModel
       # Use a different validation context to skip the 'old_versions_cannot_be_updated'
       # validator, as on this case it is legal to update some fields.
       leave_modified_by_user_alone do
-        c.save(context: :update_old_versions)
+        leave_modified_at_alone do
+          c.save(context: :update_old_versions)
+        end
       end
     end
   end
index b456bd39562adc4ff87a5c73a460c055344269b3..7f0d7c29092f51c1ac4d76b01aa6d4f62181de7f 100644 (file)
@@ -18,4 +18,21 @@ module ArvadosModelUpdates
       Thread.current[:anonymous_updater] = anonymous_updater_was
     end
   end
+
+  # ArvadosModel checks this to decide whether it should update the
+  # 'modified_at' field.
+  def timeless_updater
+    Thread.current[:timeless_updater] || false
+  end
+
+  def leave_modified_at_alone
+    timeless_updater_was = timeless_updater
+    begin
+      Thread.current[:timeless_updater] = true
+      yield
+    ensure
+      Thread.current[:timeless_updater] = timeless_updater_was
+    end
+  end
+
 end
index 084700d39bfad76b109078f29e81ecf82c40c5be..29fad32bd150749d1e1bcb1df696359adbde0a0f 100644 (file)
@@ -229,12 +229,7 @@ func (disp *Dispatcher) checkSqueueForOrphans() {
 func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []string {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
 
-       var disk int64
-       for _, m := range container.Mounts {
-               if m.Kind == "tmp" {
-                       disk += m.Capacity
-               }
-       }
+       disk := dispatchcloud.EstimateScratchSpace(&container)
        disk = int64(math.Ceil(float64(disk) / float64(1048576)))
        return []string{
                fmt.Sprintf("--mem=%d", mem),
@@ -246,7 +241,7 @@ func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []strin
 func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
        var args []string
        args = append(args, disp.SbatchArguments...)
-       args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue))
+       args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue), "--no-requeue")
 
        if disp.cluster == nil {
                // no instance types configured
index b76ece314d47806afcfb328ba12970b9171b58d5..10fdb07124cc817cc857f35384ded22ca0193d63 100644 (file)
@@ -202,6 +202,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
                [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--nice=%d", 10000),
+                       "--no-requeue",
                        fmt.Sprintf("--mem=%d", 11445),
                        fmt.Sprintf("--cpus-per-task=%d", 4),
                        fmt.Sprintf("--tmp=%d", 45777),
@@ -217,7 +218,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
 func (s *IntegrationSuite) TestSbatchFail(c *C) {
        s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
        container := s.integrationTest(c,
-               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
+               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -362,7 +363,7 @@ func (s *StubbedSuite) TestSbatchArgs(c *C) {
                s.disp.SbatchArguments = defaults
 
                args, err := s.disp.sbatchArgs(container)
-               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
+               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
                c.Check(err, IsNil)
        }
 }
@@ -408,7 +409,7 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
                args, err := s.disp.sbatchArgs(container)
                c.Check(err == nil, Equals, trial.err == nil)
                if trial.err == nil {
-                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000"}, trial.sbatchArgs...))
+                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000", "--no-requeue"}, trial.sbatchArgs...))
                } else {
                        c.Check(len(err.(dispatchcloud.ConstraintsNotSatisfiableError).AvailableTypes), Equals, len(trial.types))
                }
@@ -425,7 +426,7 @@ func (s *StubbedSuite) TestSbatchPartition(c *C) {
 
        args, err := s.disp.sbatchArgs(container)
        c.Check(args, DeepEquals, []string{
-               "--job-name=123", "--nice=10000",
+               "--job-name=123", "--nice=10000", "--no-requeue",
                "--mem=239", "--cpus-per-task=1", "--tmp=0",
                "--partition=blurb,b2",
        })
index af7b2e92ebeb0cb60697ac03dacc25e33553782b..93e0dd5d21f91c08f1f5e685d1f2de1aa6627918 100644 (file)
@@ -510,8 +510,8 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
                                Operand:  group.UUID,
                        }, {
                                Attr:     "head_uuid",
-                               Operator: "like",
-                               Operand:  "%-tpzed-%",
+                               Operator: "is_a",
+                               Operand:  "arvados#user",
                        }},
                }
                // User -> Group filter
@@ -534,8 +534,8 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot
                                Operand:  group.UUID,
                        }, {
                                Attr:     "tail_uuid",
-                               Operator: "like",
-                               Operand:  "%-tpzed-%",
+                               Operator: "is_a",
+                               Operand:  "arvados#user",
                        }},
                }
                g2uLinks, err := GetAll(cfg.Client, "links", g2uFilter, &LinkList{})