16480: Configurable timeout for entire keep-balance iteration.
authorTom Clegg <tom@tomclegg.ca>
Fri, 19 Jun 2020 14:34:01 +0000 (10:34 -0400)
committerTom Clegg <tom@tomclegg.ca>
Fri, 19 Jun 2020 14:34:01 +0000 (10:34 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@tomclegg.ca>

lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/controller/federation_test.go
lib/controller/handler_test.go
lib/controller/proxy.go
sdk/go/arvados/config.go
services/keep-balance/balance.go
services/keep-balance/collection.go
services/keep-balance/collection_test.go
services/keep-balance/keep_service.go

index 219f6ef0ba91a1afb2e3311ca66b94f5a989020f..409d2ccfaab833a1f68854c2378f19d33a003e30 100644 (file)
@@ -440,6 +440,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index fc4908c15929d7807d4c63033d61cbcef3b4bcab..1c0662470f9e842d7723c3e1238681c58dc0350d 100644 (file)
@@ -102,6 +102,7 @@ var whitelist = map[string]bool{
        "Collections.WebDAVCache":                      false,
        "Collections.BalanceCollectionBatch":           false,
        "Collections.BalancePeriod":                    false,
+       "Collections.BalanceTimeout":                   false,
        "Collections.BlobMissingReport":                false,
        "Collections.BalanceCollectionBuffers":         false,
        "Containers":                                   true,
index 6f8cab462bce2dc15118f31454b40bb35d06e3ff..30bc66fc1e15d972f6818a859d55495f40bd7f32 100644 (file)
@@ -446,6 +446,13 @@ Clusters:
       # or omitted, pages are processed serially.
       BalanceCollectionBuffers: 1000
 
+      # Maximum time for a rebalancing run. This ensures keep-balance
+      # eventually gives up and retries if, for example, a network
+      # error causes a hung connection that is never closed by the
+      # OS. It should be long enough that it doesn't interrupt a
+      # long-running balancing operation.
+      BalanceTimeout: 6h
+
       # Default lifetime for ephemeral collections: 2 weeks. This must not
       # be less than BlobSigningTTL.
       DefaultTrashLifetime: 336h
index 2b0cb22b04fbed0fedcb282c4269dbb008bff1a5..6a9ad8c15f3db2132bf5c122d8ae639764dbfff7 100644 (file)
@@ -64,6 +64,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
        cluster.TLS.Insecure = true
        cluster.API.MaxItemsPerResponse = 1000
        cluster.API.MaxRequestAmplification = 4
+       cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
        arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
        s.testHandler = &Handler{Cluster: cluster}
index c7bce97130bfb0e4b327d3d2233a41d9c9c3b73d..ef6b9195f10be05b1dd69bcbedda800df66dfdb3 100644 (file)
@@ -52,6 +52,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
                PostgreSQL:       integrationTestCluster().PostgreSQL,
                ForceLegacyAPI14: forceLegacyAPI14,
        }
+       s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute)
        s.cluster.TLS.Insecure = true
        arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
        arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
index 939868a17b94f132644e3459292290294514e84f..d7381860ea422299406e0a38e726f6d09bb38481 100644 (file)
@@ -77,9 +77,7 @@ func (p *proxy) Do(
                Header: hdrOut,
                Body:   reqIn.Body,
        }).WithContext(reqIn.Context())
-
-       resp, err := client.Do(reqOut)
-       return resp, err
+       return client.Do(reqOut)
 }
 
 // Copy a response (or error) to the downstream client
index dbd9f71099619203bb38f4dd1118b865f5c2f662..636728f1fd71feaad420cd9a018ba8e6d7d9cbc7 100644 (file)
@@ -126,6 +126,7 @@ type Cluster struct {
                BalancePeriod            Duration
                BalanceCollectionBatch   int
                BalanceCollectionBuffers int
+               BalanceTimeout           Duration
 
                WebDAVCache WebDAVCacheConfig
        }
index 261714605666cb37829a4864d069c58ef8a4b32b..86423a2976b1e0909470bd563c486aee894743af 100644 (file)
@@ -72,6 +72,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
 
        defer bal.time("sweep", "wall clock time to run one full sweep")()
 
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+       defer cancel()
+
        var lbFile *os.File
        if bal.LostBlocksFile != "" {
                tmpfn := bal.LostBlocksFile + ".tmp"
@@ -112,13 +115,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
        if err = bal.CheckSanityEarly(client); err != nil {
                return
        }
+
+       // On a big site, indexing and sending trash/pull lists can
+       // take much longer than the usual 5 minute client
+       // timeout. From here on, we rely on the context deadline
+       // instead, aborting the entire operation if any part takes
+       // too long.
+       client.Timeout = 0
+
        rs := bal.rendezvousState()
        if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
                if runOptions.SafeRendezvousState != "" {
                        bal.logf("notice: KeepServices list has changed since last run")
                }
                bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
-               if err = bal.ClearTrashLists(client); err != nil {
+               if err = bal.ClearTrashLists(ctx, client); err != nil {
                        return
                }
                // The current rendezvous state becomes "safe" (i.e.,
@@ -128,13 +139,7 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
                nextRunOptions.SafeRendezvousState = rs
        }
 
-       // Indexing and sending trash/pull lists can take a long time
-       // on a big site. Prefer a long timeout (causing slow recovery
-       // from undetected network problems) to a short timeout
-       // (causing starvation via perpetual timeout/restart cycle).
-       client.Timeout = 24 * time.Hour
-
-       if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
+       if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil {
                return
        }
        bal.ComputeChangeSets()
@@ -154,14 +159,14 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp
                lbFile = nil
        }
        if runOptions.CommitPulls {
-               err = bal.CommitPulls(client)
+               err = bal.CommitPulls(ctx, client)
                if err != nil {
                        // Skip trash if we can't pull. (Too cautious?)
                        return
                }
        }
        if runOptions.CommitTrash {
-               err = bal.CommitTrash(client)
+               err = bal.CommitTrash(ctx, client)
        }
        return
 }
@@ -294,11 +299,11 @@ func (bal *Balancer) rendezvousState() string {
 // We avoid this problem if we clear all trash lists before getting
 // indexes. (We also assume there is only one rebalancing process
 // running at a time.)
-func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
+func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error {
        for _, srv := range bal.KeepServices {
                srv.ChangeSet = &ChangeSet{}
        }
-       return bal.CommitTrash(c)
+       return bal.CommitTrash(ctx, c)
 }
 
 // GetCurrentState determines the current replication state, and the
@@ -312,8 +317,8 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
 // collection manifests in the database (API server).
 //
 // It encodes the resulting information in BlockStateMap.
-func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
-       ctx, cancel := context.WithCancel(context.Background())
+func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error {
+       ctx, cancel := context.WithCancel(ctx)
        defer cancel()
 
        defer bal.time("get_state", "wall clock time to get current state")()
@@ -415,7 +420,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro
        wg.Add(1)
        go func() {
                defer wg.Done()
-               err = EachCollection(c, pageSize,
+               err = EachCollection(ctx, c, pageSize,
                        func(coll arvados.Collection) error {
                                collQ <- coll
                                if len(errs) > 0 {
@@ -1098,22 +1103,22 @@ func (bal *Balancer) CheckSanityLate() error {
 // keepstore servers. This has the effect of increasing replication of
 // existing blocks that are either underreplicated or poorly
 // distributed according to rendezvous hashing.
-func (bal *Balancer) CommitPulls(c *arvados.Client) error {
+func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error {
        defer bal.time("send_pull_lists", "wall clock time to send pull lists")()
        return bal.commitAsync(c, "send pull list",
                func(srv *KeepService) error {
-                       return srv.CommitPulls(c)
+                       return srv.CommitPulls(ctx, c)
                })
 }
 
 // CommitTrash sends the computed lists of trash requests to the
 // keepstore servers. This has the effect of deleting blocks that are
 // overreplicated or unreferenced.
-func (bal *Balancer) CommitTrash(c *arvados.Client) error {
+func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error {
        defer bal.time("send_trash_lists", "wall clock time to send trash lists")()
        return bal.commitAsync(c, "send trash list",
                func(srv *KeepService) error {
-                       return srv.CommitTrash(c)
+                       return srv.CommitTrash(ctx, c)
                })
 }
 
index c4ddc90c419ae7e0d9c4d1b5cbadd9011f2a31fd..1659918cafe20c62162abfb1e841a40f80170c0a 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "fmt"
        "time"
 
@@ -30,7 +31,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int
 //
 // If pageSize > 0 it is used as the maximum page size in each API
 // call; otherwise the maximum allowed page size is requested.
-func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
+func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error {
        if progress == nil {
                progress = func(_, _ int) {}
        }
@@ -75,7 +76,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection)
        for {
                progress(callCount, expectCount)
                var page arvados.CollectionList
-               err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
+               err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params)
                if err != nil {
                        return err
                }
index f8921c294afa075f290c2db6fd352b315d25e8ac..3ab9d07b2e2ed6bcc7220ae17aad4e6e7a665855 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "sync"
        "time"
 
@@ -29,7 +30,7 @@ func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) {
                        longestStreak := 0
                        var lastMod time.Time
                        sawUUID := make(map[string]bool)
-                       err := EachCollection(s.client, pageSize, func(c arvados.Collection) error {
+                       err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error {
                                if c.ModifiedAt.IsZero() {
                                        return nil
                                }
index e2adf1a4b79942b9457beb2ccd31df31abbb96b9..17f8418f622f992a7025db9b9214e60c5a39f2ca 100644 (file)
@@ -5,6 +5,7 @@
 package main
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -35,19 +36,19 @@ func (srv *KeepService) URLBase() string {
 
 // CommitPulls sends the current list of pull requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitPulls(c *arvados.Client) error {
-       return srv.put(c, "pull", srv.ChangeSet.Pulls)
+func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error {
+       return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls)
 }
 
 // CommitTrash sends the current list of trash requests to the storage
 // server (even if the list is empty).
-func (srv *KeepService) CommitTrash(c *arvados.Client) error {
-       return srv.put(c, "trash", srv.ChangeSet.Trashes)
+func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error {
+       return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes)
 }
 
 // Perform a PUT request at path, with data (as JSON) in the request
 // body.
-func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error {
+func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error {
        // We'll start a goroutine to do the JSON encoding, so we can
        // stream it to the http client through a Pipe, rather than
        // keeping the entire encoded version in memory.
@@ -64,7 +65,7 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er
        }()
 
        url := srv.URLBase() + "/" + path
-       req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR))
+       req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR))
        if err != nil {
                return fmt.Errorf("building request for %s: %v", url, err)
        }