17574: Get collections directly from DB instead of controller.
[arvados.git] / services / keep-balance / collection.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "encoding/json"
10         "fmt"
11         "io"
12         "runtime"
13         "sync"
14         "sync/atomic"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "github.com/jmoiron/sqlx"
19 )
20
21 func countCollections(c *arvados.Client, params arvados.ResourceListParams) (int, error) {
22         var page arvados.CollectionList
23         var zero int
24         params.Limit = &zero
25         params.Count = "exact"
26         err := c.RequestAndDecode(&page, "GET", "arvados/v1/collections", nil, params)
27         return page.ItemsAvailable, err
28 }
29
30 // EachCollection calls f once for every readable
31 // collection. EachCollection stops if it encounters an error, such as
32 // f returning a non-nil error.
33 //
34 // The progress function is called periodically with done (number of
35 // times f has been called) and total (number of times f is expected
36 // to be called).
37 func EachCollection(ctx context.Context, db *sqlx.DB, c *arvados.Client, f func(arvados.Collection) error, progress func(done, total int)) error {
38         if progress == nil {
39                 progress = func(_, _ int) {}
40         }
41
42         expectCount, err := countCollections(c, arvados.ResourceListParams{
43                 IncludeTrash:       true,
44                 IncludeOldVersions: true,
45         })
46         if err != nil {
47                 return err
48         }
49         var newestModifiedAt time.Time
50
51         rows, err := db.QueryxContext(ctx, `SELECT uuid, manifest_text, modified_at, portable_data_hash, replication_desired, storage_classes_desired, is_trashed FROM collections`)
52         if err != nil {
53                 return err
54         }
55         callCount := 0
56         for rows.Next() {
57                 var coll arvados.Collection
58                 var classesDesired []byte
59                 err = rows.Scan(&coll.UUID, &coll.ManifestText, &coll.ModifiedAt, &coll.PortableDataHash, &coll.ReplicationDesired, &classesDesired, &coll.IsTrashed)
60                 if err != nil {
61                         rows.Close()
62                         return err
63                 }
64                 err = json.Unmarshal(classesDesired, &coll.StorageClassesDesired)
65                 if err != nil {
66                         rows.Close()
67                         return err
68                 }
69                 if newestModifiedAt.IsZero() || newestModifiedAt.Before(coll.ModifiedAt) {
70                         newestModifiedAt = coll.ModifiedAt
71                 }
72                 callCount++
73                 err = f(coll)
74                 if err != nil {
75                         return err
76                 }
77                 progress(callCount, expectCount)
78         }
79         rows.Close()
80         if err := rows.Err(); err != nil {
81                 return err
82         }
83         if checkCount, err := countCollections(c, arvados.ResourceListParams{
84                 Filters: []arvados.Filter{{
85                         Attr:     "modified_at",
86                         Operator: "<=",
87                         Operand:  newestModifiedAt}},
88                 IncludeTrash:       true,
89                 IncludeOldVersions: true,
90         }); err != nil {
91                 return err
92         } else if callCount < checkCount {
93                 return fmt.Errorf("Retrieved %d collections with modtime <= T=%q, but server now reports there are %d collections with modtime <= T", callCount, newestModifiedAt, checkCount)
94         }
95
96         return nil
97 }
98
99 func (bal *Balancer) updateCollections(ctx context.Context, c *arvados.Client, cluster *arvados.Cluster) error {
100         ctx, cancel := context.WithCancel(ctx)
101         defer cancel()
102
103         defer bal.time("update_collections", "wall clock time to update collections")()
104         threshold := time.Now()
105         thresholdStr := threshold.Format(time.RFC3339Nano)
106
107         var err error
108         collQ := make(chan arvados.Collection, cluster.Collections.BalanceCollectionBuffers)
109         go func() {
110                 defer close(collQ)
111                 err = EachCollection(ctx, bal.DB, c, func(coll arvados.Collection) error {
112                         if coll.ModifiedAt.After(threshold) {
113                                 return io.EOF
114                         }
115                         if coll.IsTrashed {
116                                 return nil
117                         }
118                         collQ <- coll
119                         return nil
120                 }, func(done, total int) {
121                         bal.logf("update collections: %d/%d", done, total)
122                 })
123                 if err == io.EOF {
124                         err = nil
125                 } else if err != nil {
126                         bal.logf("error updating collections: %s", err)
127                 }
128         }()
129
130         var updated int64
131         var wg sync.WaitGroup
132         for i := 0; i < runtime.NumCPU(); i++ {
133                 wg.Add(1)
134                 go func() {
135                         defer wg.Done()
136                         for coll := range collQ {
137                                 blkids, err := coll.SizedDigests()
138                                 if err != nil {
139                                         bal.logf("%s: %s", coll.UUID, err)
140                                         continue
141                                 }
142                                 repl := bal.BlockStateMap.GetConfirmedReplication(blkids, coll.StorageClassesDesired)
143                                 classes, err := json.Marshal(coll.StorageClassesDesired)
144                                 if err != nil {
145                                         bal.logf("BUG? json.Marshal(%v) failed: %s", classes, err)
146                                         continue
147                                 }
148                                 _, err = bal.DB.ExecContext(ctx, `update collections set
149                                         replication_confirmed=$1,
150                                         replication_confirmed_at=$2,
151                                         storage_classes_confirmed=$3,
152                                         storage_classes_confirmed_at=$2
153                                         where uuid=$4`,
154                                         repl, thresholdStr, classes, coll.UUID)
155                                 if err != nil {
156                                         bal.logf("%s: update failed: %s", coll.UUID, err)
157                                         continue
158                                 }
159                                 atomic.AddInt64(&updated, 1)
160                         }
161                 }()
162         }
163         wg.Wait()
164         bal.logf("updated %d collections", updated)
165         return err
166 }