18071: Use dblock to avoid concurrent keep-balance ops.
[arvados.git] / services / keep-balance / integration_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepbalance
6
7 import (
8         "bytes"
9         "context"
10         "io"
11         "os"
12         "strings"
13         "testing"
14         "time"
15
16         "git.arvados.org/arvados.git/lib/config"
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "git.arvados.org/arvados.git/sdk/go/arvadostest"
20         "git.arvados.org/arvados.git/sdk/go/ctxlog"
21         "git.arvados.org/arvados.git/sdk/go/keepclient"
22         "github.com/jmoiron/sqlx"
23         "github.com/prometheus/client_golang/prometheus"
24         "github.com/sirupsen/logrus"
25         check "gopkg.in/check.v1"
26 )
27
28 var _ = check.Suite(&integrationSuite{})
29
30 type integrationSuite struct {
31         config     *arvados.Cluster
32         db         *sqlx.DB
33         client     *arvados.Client
34         keepClient *keepclient.KeepClient
35 }
36
37 func (s *integrationSuite) SetUpSuite(c *check.C) {
38         if testing.Short() {
39                 c.Skip("-short")
40         }
41         arvadostest.ResetEnv()
42         arvadostest.StartKeep(4, true)
43
44         arv, err := arvadosclient.MakeArvadosClient()
45         arv.ApiToken = arvadostest.SystemRootToken
46         c.Assert(err, check.IsNil)
47
48         s.keepClient, err = keepclient.MakeKeepClient(arv)
49         c.Assert(err, check.IsNil)
50         s.putReplicas(c, "foo", 4)
51         s.putReplicas(c, "bar", 1)
52 }
53
54 func (s *integrationSuite) putReplicas(c *check.C, data string, replicas int) {
55         s.keepClient.Want_replicas = replicas
56         _, _, err := s.keepClient.PutB([]byte(data))
57         c.Assert(err, check.IsNil)
58 }
59
60 func (s *integrationSuite) TearDownSuite(c *check.C) {
61         if testing.Short() {
62                 c.Skip("-short")
63         }
64         arvadostest.StopKeep(4)
65 }
66
67 func (s *integrationSuite) SetUpTest(c *check.C) {
68         cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
69         c.Assert(err, check.Equals, nil)
70         s.config, err = cfg.GetCluster("")
71         c.Assert(err, check.Equals, nil)
72         s.db, err = sqlx.Open("postgres", s.config.PostgreSQL.Connection.String())
73         c.Assert(err, check.IsNil)
74         s.config.Collections.BalancePeriod = arvados.Duration(time.Second)
75
76         s.client = &arvados.Client{
77                 APIHost:   os.Getenv("ARVADOS_API_HOST"),
78                 AuthToken: arvadostest.SystemRootToken,
79                 Insecure:  true,
80         }
81 }
82
83 func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
84         var logBuf bytes.Buffer
85         for iter := 0; iter < 20; iter++ {
86                 logBuf.Reset()
87                 logger := logrus.New()
88                 logger.Out = io.MultiWriter(&logBuf, os.Stderr)
89                 opts := RunOptions{
90                         CommitPulls:           true,
91                         CommitTrash:           true,
92                         CommitConfirmedFields: true,
93                         Logger:                logger,
94                 }
95
96                 bal := &Balancer{
97                         DB:      s.db,
98                         Logger:  logger,
99                         Metrics: newMetrics(prometheus.NewRegistry()),
100                 }
101                 nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
102                 c.Check(err, check.IsNil)
103                 c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
104                 c.Check(nextOpts.CommitPulls, check.Equals, true)
105                 if iter == 0 {
106                         c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
107                         c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
108                 } else if strings.Contains(logBuf.String(), "ChangeSet{Pulls:0") {
109                         break
110                 }
111                 time.Sleep(200 * time.Millisecond)
112         }
113         c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*0 replicas (0 blocks, 0 bytes) underreplicated.*`)
114
115         for _, trial := range []struct {
116                 uuid    string
117                 repl    int
118                 classes []string
119         }{
120                 {arvadostest.EmptyCollectionUUID, 0, []string{}},
121                 {arvadostest.FooCollection, 2, []string{"default"}},                                // "foo" blk
122                 {arvadostest.StorageClassesDesiredDefaultConfirmedDefault, 2, []string{"default"}}, // "bar" blk
123                 {arvadostest.StorageClassesDesiredArchiveConfirmedDefault, 0, []string{}},          // "bar" blk
124         } {
125                 c.Logf("%#v", trial)
126                 var coll arvados.Collection
127                 s.client.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+trial.uuid, nil, nil)
128                 if c.Check(coll.ReplicationConfirmed, check.NotNil) {
129                         c.Check(*coll.ReplicationConfirmed, check.Equals, trial.repl)
130                 }
131                 c.Check(coll.StorageClassesConfirmed, check.DeepEquals, trial.classes)
132         }
133 }