From fd080b34a321cbd6593d69f427b9eaeab890712f Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Fri, 19 Jun 2020 10:34:01 -0400 Subject: [PATCH] 16480: Configurable timeout for entire keep-balance iteration. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 7 ++++ lib/config/export.go | 1 + lib/config/generated_config.go | 7 ++++ lib/controller/federation_test.go | 1 + lib/controller/handler_test.go | 1 + lib/controller/proxy.go | 4 +-- sdk/go/arvados/config.go | 1 + services/keep-balance/balance.go | 43 +++++++++++++----------- services/keep-balance/collection.go | 5 +-- services/keep-balance/collection_test.go | 3 +- services/keep-balance/keep_service.go | 13 +++---- 11 files changed, 55 insertions(+), 31 deletions(-) diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 219f6ef0ba..409d2ccfaa 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -440,6 +440,13 @@ Clusters: # or omitted, pages are processed serially. BalanceCollectionBuffers: 1000 + # Maximum time for a rebalancing run. This ensures keep-balance + # eventually gives up and retries if, for example, a network + # error causes a hung connection that is never closed by the + # OS. It should be long enough that it doesn't interrupt a + # long-running balancing operation. + BalanceTimeout: 6h + # Default lifetime for ephemeral collections: 2 weeks. This must not # be less than BlobSigningTTL. DefaultTrashLifetime: 336h diff --git a/lib/config/export.go b/lib/config/export.go index fc4908c159..1c0662470f 100644 --- a/lib/config/export.go +++ b/lib/config/export.go @@ -102,6 +102,7 @@ var whitelist = map[string]bool{ "Collections.WebDAVCache": false, "Collections.BalanceCollectionBatch": false, "Collections.BalancePeriod": false, + "Collections.BalanceTimeout": false, "Collections.BlobMissingReport": false, "Collections.BalanceCollectionBuffers": false, "Containers": true, diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index 6f8cab462b..30bc66fc1e 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -446,6 +446,13 @@ Clusters: # or omitted, pages are processed serially. BalanceCollectionBuffers: 1000 + # Maximum time for a rebalancing run. This ensures keep-balance + # eventually gives up and retries if, for example, a network + # error causes a hung connection that is never closed by the + # OS. It should be long enough that it doesn't interrupt a + # long-running balancing operation. + BalanceTimeout: 6h + # Default lifetime for ephemeral collections: 2 weeks. This must not # be less than BlobSigningTTL. DefaultTrashLifetime: 336h diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go index 2b0cb22b04..6a9ad8c15f 100644 --- a/lib/controller/federation_test.go +++ b/lib/controller/federation_test.go @@ -64,6 +64,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) { cluster.TLS.Insecure = true cluster.API.MaxItemsPerResponse = 1000 cluster.API.MaxRequestAmplification = 4 + cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute) arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/") arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/") s.testHandler = &Handler{Cluster: cluster} diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go index c7bce97130..ef6b9195f1 100644 --- a/lib/controller/handler_test.go +++ b/lib/controller/handler_test.go @@ -52,6 +52,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) { PostgreSQL: integrationTestCluster().PostgreSQL, ForceLegacyAPI14: forceLegacyAPI14, } + s.cluster.API.RequestTimeout = arvados.Duration(5 * time.Minute) s.cluster.TLS.Insecure = true arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST")) arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/") diff --git a/lib/controller/proxy.go b/lib/controller/proxy.go index 939868a17b..d7381860ea 100644 --- a/lib/controller/proxy.go +++ b/lib/controller/proxy.go @@ -77,9 +77,7 @@ func (p *proxy) Do( Header: hdrOut, Body: reqIn.Body, }).WithContext(reqIn.Context()) - - resp, err := client.Do(reqOut) - return resp, err + return client.Do(reqOut) } // Copy a response (or error) to the downstream client diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go index dbd9f71099..636728f1fd 100644 --- a/sdk/go/arvados/config.go +++ b/sdk/go/arvados/config.go @@ -126,6 +126,7 @@ type Cluster struct { BalancePeriod Duration BalanceCollectionBatch int BalanceCollectionBuffers int + BalanceTimeout Duration WebDAVCache WebDAVCacheConfig } diff --git a/services/keep-balance/balance.go b/services/keep-balance/balance.go index 2617146056..86423a2976 100644 --- a/services/keep-balance/balance.go +++ b/services/keep-balance/balance.go @@ -72,6 +72,9 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp defer bal.time("sweep", "wall clock time to run one full sweep")() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration())) + defer cancel() + var lbFile *os.File if bal.LostBlocksFile != "" { tmpfn := bal.LostBlocksFile + ".tmp" @@ -112,13 +115,21 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp if err = bal.CheckSanityEarly(client); err != nil { return } + + // On a big site, indexing and sending trash/pull lists can + // take much longer than the usual 5 minute client + // timeout. From here on, we rely on the context deadline + // instead, aborting the entire operation if any part takes + // too long. + client.Timeout = 0 + rs := bal.rendezvousState() if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState { if runOptions.SafeRendezvousState != "" { bal.logf("notice: KeepServices list has changed since last run") } bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run") - if err = bal.ClearTrashLists(client); err != nil { + if err = bal.ClearTrashLists(ctx, client); err != nil { return } // The current rendezvous state becomes "safe" (i.e., @@ -128,13 +139,7 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp nextRunOptions.SafeRendezvousState = rs } - // Indexing and sending trash/pull lists can take a long time - // on a big site. Prefer a long timeout (causing slow recovery - // from undetected network problems) to a short timeout - // (causing starvation via perpetual timeout/restart cycle). - client.Timeout = 24 * time.Hour - - if err = bal.GetCurrentState(client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { + if err = bal.GetCurrentState(ctx, client, cluster.Collections.BalanceCollectionBatch, cluster.Collections.BalanceCollectionBuffers); err != nil { return } bal.ComputeChangeSets() @@ -154,14 +159,14 @@ func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOp lbFile = nil } if runOptions.CommitPulls { - err = bal.CommitPulls(client) + err = bal.CommitPulls(ctx, client) if err != nil { // Skip trash if we can't pull. (Too cautious?) return } } if runOptions.CommitTrash { - err = bal.CommitTrash(client) + err = bal.CommitTrash(ctx, client) } return } @@ -294,11 +299,11 @@ func (bal *Balancer) rendezvousState() string { // We avoid this problem if we clear all trash lists before getting // indexes. (We also assume there is only one rebalancing process // running at a time.) -func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { +func (bal *Balancer) ClearTrashLists(ctx context.Context, c *arvados.Client) error { for _, srv := range bal.KeepServices { srv.ChangeSet = &ChangeSet{} } - return bal.CommitTrash(c) + return bal.CommitTrash(ctx, c) } // GetCurrentState determines the current replication state, and the @@ -312,8 +317,8 @@ func (bal *Balancer) ClearTrashLists(c *arvados.Client) error { // collection manifests in the database (API server). // // It encodes the resulting information in BlockStateMap. -func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error { - ctx, cancel := context.WithCancel(context.Background()) +func (bal *Balancer) GetCurrentState(ctx context.Context, c *arvados.Client, pageSize, bufs int) error { + ctx, cancel := context.WithCancel(ctx) defer cancel() defer bal.time("get_state", "wall clock time to get current state")() @@ -415,7 +420,7 @@ func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) erro wg.Add(1) go func() { defer wg.Done() - err = EachCollection(c, pageSize, + err = EachCollection(ctx, c, pageSize, func(coll arvados.Collection) error { collQ <- coll if len(errs) > 0 { @@ -1098,22 +1103,22 @@ func (bal *Balancer) CheckSanityLate() error { // keepstore servers. This has the effect of increasing replication of // existing blocks that are either underreplicated or poorly // distributed according to rendezvous hashing. -func (bal *Balancer) CommitPulls(c *arvados.Client) error { +func (bal *Balancer) CommitPulls(ctx context.Context, c *arvados.Client) error { defer bal.time("send_pull_lists", "wall clock time to send pull lists")() return bal.commitAsync(c, "send pull list", func(srv *KeepService) error { - return srv.CommitPulls(c) + return srv.CommitPulls(ctx, c) }) } // CommitTrash sends the computed lists of trash requests to the // keepstore servers. This has the effect of deleting blocks that are // overreplicated or unreferenced. -func (bal *Balancer) CommitTrash(c *arvados.Client) error { +func (bal *Balancer) CommitTrash(ctx context.Context, c *arvados.Client) error { defer bal.time("send_trash_lists", "wall clock time to send trash lists")() return bal.commitAsync(c, "send trash list", func(srv *KeepService) error { - return srv.CommitTrash(c) + return srv.CommitTrash(ctx, c) }) } diff --git a/services/keep-balance/collection.go b/services/keep-balance/collection.go index c4ddc90c41..1659918caf 100644 --- a/services/keep-balance/collection.go +++ b/services/keep-balance/collection.go @@ -5,6 +5,7 @@ package main import ( + "context" "fmt" "time" @@ -30,7 +31,7 @@ func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int // // If pageSize > 0 it is used as the maximum page size in each API // call; otherwise the maximum allowed page size is requested. -func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error { +func EachCollection(ctx context.Context, c *arvados.Client, pageSize int, f func(arvados.Collection) error, progress func(done, total int)) error { if progress == nil { progress = func(_, _ int) {} } @@ -75,7 +76,7 @@ func EachCollection(c *arvados.Client, pageSize int, f func(arvados.Collection) for { progress(callCount, expectCount) var page arvados.CollectionList - err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params) + err := c.RequestAndDecodeContext(ctx, &page, "GET", "arvados/v1/collections", nil, params) if err != nil { return err } diff --git a/services/keep-balance/collection_test.go b/services/keep-balance/collection_test.go index f8921c294a..3ab9d07b2e 100644 --- a/services/keep-balance/collection_test.go +++ b/services/keep-balance/collection_test.go @@ -5,6 +5,7 @@ package main import ( + "context" "sync" "time" @@ -29,7 +30,7 @@ func (s *integrationSuite) TestIdenticalTimestamps(c *check.C) { longestStreak := 0 var lastMod time.Time sawUUID := make(map[string]bool) - err := EachCollection(s.client, pageSize, func(c arvados.Collection) error { + err := EachCollection(context.Background(), s.client, pageSize, func(c arvados.Collection) error { if c.ModifiedAt.IsZero() { return nil } diff --git a/services/keep-balance/keep_service.go b/services/keep-balance/keep_service.go index e2adf1a4b7..17f8418f62 100644 --- a/services/keep-balance/keep_service.go +++ b/services/keep-balance/keep_service.go @@ -5,6 +5,7 @@ package main import ( + "context" "encoding/json" "fmt" "io" @@ -35,19 +36,19 @@ func (srv *KeepService) URLBase() string { // CommitPulls sends the current list of pull requests to the storage // server (even if the list is empty). -func (srv *KeepService) CommitPulls(c *arvados.Client) error { - return srv.put(c, "pull", srv.ChangeSet.Pulls) +func (srv *KeepService) CommitPulls(ctx context.Context, c *arvados.Client) error { + return srv.put(ctx, c, "pull", srv.ChangeSet.Pulls) } // CommitTrash sends the current list of trash requests to the storage // server (even if the list is empty). -func (srv *KeepService) CommitTrash(c *arvados.Client) error { - return srv.put(c, "trash", srv.ChangeSet.Trashes) +func (srv *KeepService) CommitTrash(ctx context.Context, c *arvados.Client) error { + return srv.put(ctx, c, "trash", srv.ChangeSet.Trashes) } // Perform a PUT request at path, with data (as JSON) in the request // body. -func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) error { +func (srv *KeepService) put(ctx context.Context, c *arvados.Client, path string, data interface{}) error { // We'll start a goroutine to do the JSON encoding, so we can // stream it to the http client through a Pipe, rather than // keeping the entire encoded version in memory. @@ -64,7 +65,7 @@ func (srv *KeepService) put(c *arvados.Client, path string, data interface{}) er }() url := srv.URLBase() + "/" + path - req, err := http.NewRequest("PUT", url, ioutil.NopCloser(jsonR)) + req, err := http.NewRequestWithContext(ctx, "PUT", url, ioutil.NopCloser(jsonR)) if err != nil { return fmt.Errorf("building request for %s: %v", url, err) } -- 2.30.2