9998: Handle timestamp collisions in collection index.
authorTom Clegg <tom@curoverse.com>
Mon, 23 Jan 2017 20:31:23 +0000 (15:31 -0500)
committerTom Clegg <tom@curoverse.com>
Mon, 23 Jan 2017 20:31:23 +0000 (15:31 -0500)
services/keep-balance/balance_run_test.go
services/keep-balance/collection.go
services/keep-balance/collection_test.go [new file with mode: 0644]

index 30683b4228957386dcea27d709ded7d2cdbcfee7..02080329ec2baf1d9dd17feac8c6dd9719bff110 100644 (file)
@@ -138,7 +138,9 @@ func (s *stubServer) serveCollectionsButSkipOne() *reqTracker {
                rt.Add(r)
                if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003c="`) {
                        io.WriteString(w, `{"items_available":3,"items":[]}`)
-               } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e="`) {
+               } else if strings.Contains(r.Form.Get("filters"), `"modified_at","\u003e`) {
+                       io.WriteString(w, `{"items_available":0,"items":[]}`)
+               } else if strings.Contains(r.Form.Get("filters"), `"modified_at","="`) && strings.Contains(r.Form.Get("filters"), `"uuid","\u003e"`) {
                        io.WriteString(w, `{"items_available":0,"items":[]}`)
                } else {
                        io.WriteString(w, `{"items_available":2,"items":[
index e8ac30bd7acea506508cbdd0d9036d763aac89d3..a7a484672a0c3c073717ac1e65e77c44af21cdf5 100644 (file)
@@ -53,6 +53,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
        var last arvados.Collection
        var filterTime time.Time
        callCount := 0
+       gettingExactTimestamp := false
        for {
                progress(callCount, expectCount)
                var page arvados.CollectionList
@@ -71,26 +72,62 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
                        }
                        last = coll
                }
-               if last.ModifiedAt == nil || *last.ModifiedAt == filterTime {
-                       if page.ItemsAvailable > len(page.Items) {
-                               // TODO: use "mtime=X && UUID>Y"
-                               // filters to get all collections with
-                               // this timestamp, then use "mtime>X"
-                               // to get the next timestamp.
-                               return fmt.Errorf("BUG: Received an entire page with the same modified_at timestamp (%v), cannot make progress", filterTime)
-                       }
+               if len(page.Items) == 0 && !gettingExactTimestamp {
                        break
+               } else if last.ModifiedAt == nil {
+                       return fmt.Errorf("BUG: Last collection on the page (%s) has no modified_at timestamp; cannot make progress", last.UUID)
+               } else if len(page.Items) > 0 && *last.ModifiedAt == filterTime {
+                       // If we requested time>=X and never got a
+                       // time>X then we might not have received all
+                       // items with time==X yet. Switch to
+                       // gettingExactTimestamp mode (if we're not
+                       // there already), advancing our UUID
+                       // threshold with each request, until we get
+                       // an empty page.
+                       gettingExactTimestamp = true
+                       params.Filters = []arvados.Filter{{
+                               Attr:     "modified_at",
+                               Operator: "=",
+                               Operand:  filterTime,
+                       }, {
+                               Attr:     "uuid",
+                               Operator: ">",
+                               Operand:  last.UUID,
+                       }}
+               } else if gettingExactTimestamp {
+                       // This must be an empty page (in this mode,
+                       // an unequal timestamp is impossible) so we
+                       // can start getting pages of newer
+                       // collections.
+                       gettingExactTimestamp = false
+                       params.Filters = []arvados.Filter{{
+                               Attr:     "modified_at",
+                               Operator: ">",
+                               Operand:  filterTime,
+                       }}
+               } else {
+                       // In the normal case, we know we have seen
+                       // all collections with modtime<filterTime,
+                       // but we might not have seen all that have
+                       // modtime=filterTime. Hence we use >= instead
+                       // of > and skip the obvious overlapping item,
+                       // i.e., the last item on the previous
+                       // page. In some edge cases this can return
+                       // collections we have already seen, but
+                       // avoiding that would add overhead in the
+                       // overwhelmingly common cases, so we don't
+                       // bother.
+                       filterTime = *last.ModifiedAt
+                       params.Filters = []arvados.Filter{{
+                               Attr:     "modified_at",
+                               Operator: ">=",
+                               Operand:  filterTime,
+                       }, {
+                               Attr:     "uuid",
+                               Operator: "!=",
+                               Operand:  last.UUID,
+                       }}
                }
-               filterTime = *last.ModifiedAt
-               params.Filters = []arvados.Filter{{
-                       Attr:     "modified_at",
-                       Operator: ">=",
-                       Operand:  filterTime,
-               }, {
-                       Attr:     "uuid",
-                       Operator: "!=",
-                       Operand:  last.UUID,
-               }}
        }
        progress(callCount, expectCount)
 
diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go
new file mode 100644 (file)
index 0000000..629ff3a
--- /dev/null
@@ -0,0 +1,57 @@
+package main
+
+import (
+       "sync"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+//  TestIdenticalTimestamps ensures EachCollection returns the same
+//  set of collections for various page sizes -- even page sizes so
+//  small that we get entire pages full of collections with identical
+//  timestamps and exercise our gettingExactTimestamp cases.
+func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
+       // pageSize==0 uses the default (large) page size.
+       pageSizes := []int{0, 2, 3, 4, 5}
+       got := make([][]string, len(pageSizes))
+       var wg sync.WaitGroup
+       for trial, pageSize := range pageSizes {
+               wg.Add(1)
+               go func(trial, pageSize int) {
+                       defer wg.Done()
+                       streak := 0
+                       longestStreak := 0
+                       var lastMod time.Time
+                       sawUUID := make(map[string]bool)
+                       err := EachCollection(&s.config.Client, pageSize, func(c arvados.Collection) error {
+                               got[trial] = append(got[trial], c.UUID)
+                               if c.ModifiedAt == nil {
+                                       return nil
+                               }
+                               if sawUUID[c.UUID] {
+                                       // dup
+                                       return nil
+                               }
+                               sawUUID[c.UUID] = true
+                               if lastMod == *c.ModifiedAt {
+                                       streak++
+                                       if streak > longestStreak {
+                                               longestStreak = streak
+                                       }
+                               } else {
+                                       streak = 0
+                                       lastMod = *c.ModifiedAt
+                               }
+                               return nil
+                       }, nil)
+                       c.Check(err, check.IsNil)
+                       c.Check(longestStreak > 25, check.Equals, true)
+               }(trial, pageSize)
+       }
+       wg.Wait()
+       for trial := 1; trial < len(pageSizes); trial++ {
+               c.Check(got[trial], check.DeepEquals, got[0])
+       }
+}