6e0a066e869261a35ee60212d77090f1e9432d3c
[arvados.git] / services / keep-balance / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "runtime"
12         "sync"
13         "sync/atomic"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "github.com/jmoiron/sqlx"
18 )
19
20 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
21         var page arvados.CollectionList
22         var zero int
23         params.Limit = &zero
24         params.Count = "exact"
25         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
26         return page.ItemsAvailable, err
27 }
28
29 // EachCollection calls f once for every readable
30 // collection. EachCollection stops if it encounters an error, such as
31 // f returning a non-nil error.
32 //
33 // The progress function is called periodically with done (number of
34 // times f has been called) and total (number of times f is expected
35 // to be called).
36 func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
37         if progress == nil {
38                 progress = func(_, _ int) {}
39         }
40
41         expectCount, err := countCollections(c, arvados.ResourceListParams{
42                 IncludeTrash:       true,
43                 IncludeOldVersions: true,
44         })
45         if err != nil {
46                 return err
47         }
48         var newestModifiedAt time.Time
49
50         rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
51         if err != nil {
52                 return err
53         }
54         progressTicker := time.NewTicker(10 * time.Second)
55         defer progressTicker.Stop()
56         callCount := 0
57         for rows.Next() {
58                 var coll arvados.Collection
59                 var classesDesired []byte
60                 err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
61                 if err != nil {
62                         rows.Close()
63                         return err
64                 }
65                 err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
66                 if err != nil {
67                         rows.Close()
68                         return err
69                 }
70                 if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
71                         newestModifiedAt = coll.ModifiedAt
72                 }
73                 callCount++
74                 err = f(coll)
75                 if err != nil {
76                         return err
77                 }
78                 select {
79                 case <-progressTicker.C:
80                         progress(callCount, expectCount)
81                 default:
82                 }
83         }
84         progress(callCount, expectCount)
85         rows.Close()
86         if err := rows.Err(); err != nil {
87                 return err
88         }
89         if checkCount, err := countCollections(c, arvados.ResourceListParams{
90                 Filters: []arvados.Filter{{
91                         Attr:     "modified_at",
92                         Operator: "<=",
93                         Operand:  newestModifiedAt}},
94                 IncludeTrash:       true,
95                 IncludeOldVersions: true,
96         }); err != nil {
97                 return err
98         } else if callCount < checkCount {
99                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
100         }
101
102         return nil
103 }
104
105 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
106         ctx, cancel := context.WithCancel(ctx)
107         defer cancel()
108
109         defer bal.time("update_collections", "wall clock time to update collections")()
110         threshold := time.Now()
111         thresholdStr := threshold.Format(time.RFC3339Nano)
112
113         updated := int64(0)
114
115         var err error
116         collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
117         go func() {
118                 defer close(collQ)
119                 err := EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
120                         if atomic.LoadInt64(&updated) >= int64(cluster.Collections.BalanceUpdateLimit) {
121                                 bal.logf("reached BalanceUpdateLimit (%d)", cluster.Collections.BalanceUpdateLimit)
122                                 cancel()
123                                 return context.Canceled
124                         }
125                         collQ <- coll
126                         return nil
127                 }, func(done, total int) {
128                         bal.logf("update collections: %d/%d", done, total)
129                 })
130                 if err != nil && err != context.Canceled {
131                         bal.logf("error updating collections: %s", err)
132                 }
133         }()
134
135         var wg sync.WaitGroup
136         for i := 0; i < runtime.NumCPU(); i++ {
137                 wg.Add(1)
138                 go func() {
139                         defer wg.Done()
140                         for coll := range collQ {
141                                 if ctx.Err() != nil {
142                                         continue
143                                 }
144                                 blkids, err := coll.SizedDigests()
145                                 if err != nil {
146                                         bal.logf("%s: %s", coll.UUID, err)
147                                         continue
148                                 }
149                                 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
150                                 classes, err := json.Marshal(coll.StorageClassesDesired)
151                                 if err != nil {
152                                         bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
153                                         continue
154                                 }
155                                 _, err = bal.DB.ExecContext(ctx, `update collections set
156                                         replication_confirmed=$1,
157                                         replication_confirmed_at=$2,
158                                         storage_classes_confirmed=$3,
159                                         storage_classes_confirmed_at=$2
160                                         where uuid=$4`,
161                                         repl, thresholdStr, classes, coll.UUID)
162                                 if err != nil {
163                                         if err != context.Canceled {
164                                                 bal.logf("%s: update failed: %s", coll.UUID, err)
165                                         }
166                                         continue
167                                 }
168                                 atomic.AddInt64(&updated, 1)
169                         }
170                 }()
171         }
172         wg.Wait()
173         bal.logf("updated %d collections", updated)
174         return err
175 }