7471aa6cb06d084416615c8a9eb975d231098dc5
[arvados.git] / services / keep-balance / balance.go
1 package main
2
3 import (
4         "fmt"
5         "log"
6         "os"
7         "runtime"
8         "strings"
9         "sync"
10         "time"
11
12         "git.curoverse.com/arvados.git/sdk/go/arvados"
13         "git.curoverse.com/arvados.git/sdk/go/keepclient"
14 )
15
16 // CheckConfig returns an error if anything is wrong with the given
17 // config and runOptions.
18 func CheckConfig(config Config, runOptions RunOptions) error {
19         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
20                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
21         }
22         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
23                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
24         }
25         return nil
26 }
27
28 // Balancer compares the contents of keepstore servers with the
29 // collections stored in Arvados, and issues pull/trash requests
30 // needed to get (closer to) the optimal data layout.
31 //
32 // In the optimal data layout: every data block referenced by a
33 // collection is replicated at least as many times as desired by the
34 // collection; there are no unreferenced data blocks older than
35 // BlobSignatureTTL; and all N existing replicas of a given data block
36 // are in the N best positions in rendezvous probe order.
37 type Balancer struct {
38         *BlockStateMap
39         KeepServices       map[string]*KeepService
40         DefaultReplication int
41         Logger             *log.Logger
42         Dumper             *log.Logger
43         MinMtime           int64
44
45         collScanned  int
46         serviceRoots map[string]string
47         errors       []error
48         mutex        sync.Mutex
49 }
50
51 // Run performs a balance operation using the given config and
52 // runOptions. It should only be called once on a given Balancer
53 // object. Typical usage:
54 //
55 //   err = (&Balancer{}).Run(config, runOptions)
56 func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
57         bal.Dumper = runOptions.Dumper
58         bal.Logger = runOptions.Logger
59         if bal.Logger == nil {
60                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
61         }
62
63         defer timeMe(bal.Logger, "Run")()
64
65         if len(config.KeepServiceList.Items) > 0 {
66                 err = bal.SetKeepServices(config.KeepServiceList)
67         } else {
68                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
69         }
70         if err != nil {
71                 return
72         }
73
74         if err = bal.CheckSanityEarly(&config.Client); err != nil {
75                 return
76         }
77         if runOptions.CommitTrash {
78                 if err = bal.ClearTrashLists(&config.Client); err != nil {
79                         return
80                 }
81         }
82         if err = bal.GetCurrentState(&config.Client); err != nil {
83                 return
84         }
85         bal.ComputeChangeSets()
86         bal.PrintStatistics()
87         if err = bal.CheckSanityLate(); err != nil {
88                 return
89         }
90         if runOptions.CommitPulls {
91                 err = bal.CommitPulls(&config.Client)
92                 if err != nil {
93                         // Skip trash if we can't pull. (Too cautious?)
94                         return
95                 }
96         }
97         if runOptions.CommitTrash {
98                 err = bal.CommitTrash(&config.Client)
99         }
100         return
101 }
102
103 // SetKeepServices sets the list of KeepServices to operate on.
104 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
105         bal.KeepServices = make(map[string]*KeepService)
106         for _, srv := range srvList.Items {
107                 bal.KeepServices[srv.UUID] = &KeepService{
108                         KeepService: srv,
109                         ChangeSet:   &ChangeSet{},
110                 }
111         }
112         return nil
113 }
114
115 // DiscoverKeepServices sets the list of KeepServices by calling the
116 // API to get a list of all services, and selecting the ones whose
117 // ServiceType is in okTypes.
118 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
119         bal.KeepServices = make(map[string]*KeepService)
120         ok := make(map[string]bool)
121         for _, t := range okTypes {
122                 ok[t] = true
123         }
124         return c.EachKeepService(func(srv arvados.KeepService) error {
125                 if ok[srv.ServiceType] {
126                         bal.KeepServices[srv.UUID] = &KeepService{
127                                 KeepService: srv,
128                                 ChangeSet:   &ChangeSet{},
129                         }
130                 } else {
131                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
132                 }
133                 return nil
134         })
135 }
136
137 // CheckSanityEarly checks for configuration and runtime errors that
138 // can be detected before GetCurrentState() and ComputeChangeSets()
139 // are called.
140 //
141 // If it returns an error, it is pointless to run GetCurrentState or
142 // ComputeChangeSets: after doing so, the statistics would be
143 // meaningless and it would be dangerous to run any Commit methods.
144 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
145         u, err := c.CurrentUser()
146         if err != nil {
147                 return fmt.Errorf("CurrentUser(): %v", err)
148         }
149         if !u.IsActive || !u.IsAdmin {
150                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
151         }
152         for _, srv := range bal.KeepServices {
153                 if srv.ServiceType == "proxy" {
154                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
155                 }
156         }
157         return nil
158 }
159
160 // ClearTrashLists sends an empty trash list to each keep
161 // service. Calling this before GetCurrentState avoids races.
162 //
163 // When a block appears in an index, we assume that replica will still
164 // exist after we delete other replicas on other servers. However,
165 // it's possible that a previous rebalancing operation made different
166 // decisions (e.g., servers were added/removed, and rendezvous order
167 // changed). In this case, the replica might already be on that
168 // server's trash list, and it might be deleted before we send a
169 // replacement trash list.
170 //
171 // We avoid this problem if we clear all trash lists before getting
172 // indexes. (We also assume there is only one rebalancing process
173 // running at a time.)
174 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
175         for _, srv := range bal.KeepServices {
176                 srv.ChangeSet = &ChangeSet{}
177         }
178         return bal.CommitTrash(c)
179 }
180
181 // GetCurrentState determines the current replication state, and the
182 // desired replication level, for every block that is either
183 // retrievable or referenced.
184 //
185 // It determines the current replication state by reading the block index
186 // from every known Keep service.
187 //
188 // It determines the desired replication level by retrieving all
189 // collection manifests in the database (API server).
190 //
191 // It encodes the resulting information in BlockStateMap.
192 func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
193         defer timeMe(bal.Logger, "GetCurrentState")()
194         bal.BlockStateMap = NewBlockStateMap()
195
196         dd, err := c.DiscoveryDocument()
197         if err != nil {
198                 return err
199         }
200         bal.DefaultReplication = dd.DefaultCollectionReplication
201         bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
202
203         errs := make(chan error, 2+len(bal.KeepServices))
204         wg := sync.WaitGroup{}
205
206         // Start one goroutine for each KeepService: retrieve the
207         // index, and add the returned blocks to BlockStateMap.
208         for _, srv := range bal.KeepServices {
209                 wg.Add(1)
210                 go func(srv *KeepService) {
211                         defer wg.Done()
212                         bal.logf("%s: retrieve index", srv)
213                         idx, err := srv.Index(c, "")
214                         if err != nil {
215                                 errs <- fmt.Errorf("%s: %v", srv, err)
216                                 return
217                         }
218                         bal.logf("%s: add %d replicas to map", srv, len(idx))
219                         bal.BlockStateMap.AddReplicas(srv, idx)
220                         bal.logf("%s: done", srv)
221                 }(srv)
222         }
223
224         // collQ buffers incoming collections so we can start fetching
225         // the next page without waiting for the current page to
226         // finish processing. (1000 happens to match the page size
227         // used by (*arvados.Client)EachCollection(), but it's OK if
228         // they don't match.)
229         collQ := make(chan arvados.Collection, 1000)
230
231         // Start a goroutine to process collections. (We could use a
232         // worker pool here, but even with a single worker we already
233         // process collections much faster than we can retrieve them.)
234         wg.Add(1)
235         go func() {
236                 defer wg.Done()
237                 for coll := range collQ {
238                         err := bal.addCollection(coll)
239                         if err != nil {
240                                 errs <- err
241                                 for range collQ {
242                                 }
243                                 return
244                         }
245                         bal.collScanned++
246                 }
247         }()
248
249         // Start a goroutine to retrieve all collections from the
250         // Arvados database and send them to collQ for processing.
251         wg.Add(1)
252         go func() {
253                 defer wg.Done()
254                 err = EachCollection(c,
255                         func(coll arvados.Collection) error {
256                                 collQ <- coll
257                                 if len(errs) > 0 {
258                                         // some other GetCurrentState
259                                         // error happened: no point
260                                         // getting any more
261                                         // collections.
262                                         return fmt.Errorf("")
263                                 }
264                                 return nil
265                         }, func(done, total int) {
266                                 bal.logf("collections: %d/%d", done, total)
267                         })
268                 close(collQ)
269                 if err != nil {
270                         errs <- err
271                 }
272         }()
273
274         go func() {
275                 // Send a nil error when all goroutines finish. If
276                 // this is the first error sent to errs, then
277                 // everything worked.
278                 wg.Wait()
279                 errs <- nil
280         }()
281         return <-errs
282 }
283
284 func (bal *Balancer) addCollection(coll arvados.Collection) error {
285         blkids, err := coll.SizedDigests()
286         if err != nil {
287                 bal.mutex.Lock()
288                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
289                 bal.mutex.Unlock()
290                 return nil
291         }
292         repl := bal.DefaultReplication
293         if coll.ReplicationDesired != nil {
294                 repl = *coll.ReplicationDesired
295         }
296         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
297         bal.BlockStateMap.IncreaseDesired(repl, blkids)
298         return nil
299 }
300
301 // ComputeChangeSets compares, for each known block, the current and
302 // desired replication states. If it is possible to get closer to the
303 // desired state by copying or deleting blocks, it adds those changes
304 // to the relevant KeepServices' ChangeSets.
305 //
306 // It does not actually apply any of the computed changes.
307 func (bal *Balancer) ComputeChangeSets() {
308         // This just calls balanceBlock() once for each block, using a
309         // pool of worker goroutines.
310         defer timeMe(bal.Logger, "ComputeChangeSets")()
311         bal.setupServiceRoots()
312
313         type balanceTask struct {
314                 blkid arvados.SizedDigest
315                 blk   *BlockState
316         }
317         nWorkers := 1 + runtime.NumCPU()
318         todo := make(chan balanceTask, nWorkers)
319         var wg sync.WaitGroup
320         for i := 0; i < nWorkers; i++ {
321                 wg.Add(1)
322                 go func() {
323                         for work := range todo {
324                                 bal.balanceBlock(work.blkid, work.blk)
325                         }
326                         wg.Done()
327                 }()
328         }
329         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
330                 todo <- balanceTask{
331                         blkid: blkid,
332                         blk:   blk,
333                 }
334         })
335         close(todo)
336         wg.Wait()
337 }
338
339 func (bal *Balancer) setupServiceRoots() {
340         bal.serviceRoots = make(map[string]string)
341         for _, srv := range bal.KeepServices {
342                 bal.serviceRoots[srv.UUID] = srv.UUID
343         }
344 }
345
346 const (
347         changeStay = iota
348         changePull
349         changeTrash
350         changeNone
351 )
352
353 var changeName = map[int]string{
354         changeStay:  "stay",
355         changePull:  "pull",
356         changeTrash: "trash",
357         changeNone:  "none",
358 }
359
360 // balanceBlock compares current state to desired state for a single
361 // block, and makes the appropriate ChangeSet calls.
362 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
363         debugf("balanceBlock: %v %+v", blkid, blk)
364         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
365         hasRepl := make(map[string]Replica, len(bal.serviceRoots))
366         for _, repl := range blk.Replicas {
367                 hasRepl[repl.UUID] = repl
368                 // TODO: when multiple copies are on one server, use
369                 // the oldest one that doesn't have a timestamp
370                 // collision with other replicas.
371         }
372         // number of replicas already found in positions better than
373         // the position we're contemplating now.
374         reportedBestRepl := 0
375         // To be safe we assume two replicas with the same Mtime are
376         // in fact the same replica being reported more than
377         // once. len(uniqueBestRepl) is the number of distinct
378         // replicas in the best rendezvous positions we've considered
379         // so far.
380         uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
381         // pulls is the number of Pull changes we have already
382         // requested. (For purposes of deciding whether to Pull to
383         // rendezvous position N, we should assume all pulls we have
384         // requested on rendezvous positions M<N will be successful.)
385         pulls := 0
386         var changes []string
387         for _, uuid := range uuids {
388                 change := changeNone
389                 srv := bal.KeepServices[uuid]
390                 // TODO: request a Touch if Mtime is duplicated.
391                 repl, ok := hasRepl[srv.UUID]
392                 if ok {
393                         // This service has a replica. We should
394                         // delete it if [1] we already have enough
395                         // distinct replicas in better rendezvous
396                         // positions and [2] this replica's Mtime is
397                         // distinct from all of the better replicas'
398                         // Mtimes.
399                         if !srv.ReadOnly &&
400                                 repl.Mtime < bal.MinMtime &&
401                                 len(uniqueBestRepl) >= blk.Desired &&
402                                 !uniqueBestRepl[repl.Mtime] {
403                                 srv.AddTrash(Trash{
404                                         SizedDigest: blkid,
405                                         Mtime:       repl.Mtime,
406                                 })
407                                 change = changeTrash
408                         } else {
409                                 change = changeStay
410                         }
411                         uniqueBestRepl[repl.Mtime] = true
412                         reportedBestRepl++
413                 } else if pulls+reportedBestRepl < blk.Desired &&
414                         len(blk.Replicas) > 0 &&
415                         !srv.ReadOnly {
416                         // This service doesn't have a replica. We
417                         // should pull one to this server if we don't
418                         // already have enough (existing+requested)
419                         // replicas in better rendezvous positions.
420                         srv.AddPull(Pull{
421                                 SizedDigest: blkid,
422                                 Source:      blk.Replicas[0].KeepService,
423                         })
424                         pulls++
425                         change = changePull
426                 }
427                 if bal.Dumper != nil {
428                         changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
429                 }
430         }
431         if bal.Dumper != nil {
432                 bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
433         }
434 }
435
436 type blocksNBytes struct {
437         replicas int
438         blocks   int
439         bytes    int64
440 }
441
442 func (bb blocksNBytes) String() string {
443         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
444 }
445
446 type balancerStats struct {
447         lost, overrep, unref, garbage, underrep, justright blocksNBytes
448         desired, current                                   blocksNBytes
449         pulls, trashes                                     int
450 }
451
452 func (bal *Balancer) getStatistics() (s balancerStats) {
453         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
454                 surplus := len(blk.Replicas) - blk.Desired
455                 bytes := blkid.Size()
456                 switch {
457                 case len(blk.Replicas) == 0 && blk.Desired > 0:
458                         s.lost.replicas -= surplus
459                         s.lost.blocks++
460                         s.lost.bytes += bytes * int64(-surplus)
461                 case len(blk.Replicas) < blk.Desired:
462                         s.underrep.replicas -= surplus
463                         s.underrep.blocks++
464                         s.underrep.bytes += bytes * int64(-surplus)
465                 case len(blk.Replicas) > 0 && blk.Desired == 0:
466                         counter := &s.garbage
467                         for _, r := range blk.Replicas {
468                                 if r.Mtime >= bal.MinMtime {
469                                         counter = &s.unref
470                                         break
471                                 }
472                         }
473                         counter.replicas += surplus
474                         counter.blocks++
475                         counter.bytes += bytes * int64(surplus)
476                 case len(blk.Replicas) > blk.Desired:
477                         s.overrep.replicas += surplus
478                         s.overrep.blocks++
479                         s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
480                 default:
481                         s.justright.replicas += blk.Desired
482                         s.justright.blocks++
483                         s.justright.bytes += bytes * int64(blk.Desired)
484                 }
485
486                 if blk.Desired > 0 {
487                         s.desired.replicas += blk.Desired
488                         s.desired.blocks++
489                         s.desired.bytes += bytes * int64(blk.Desired)
490                 }
491                 if len(blk.Replicas) > 0 {
492                         s.current.replicas += len(blk.Replicas)
493                         s.current.blocks++
494                         s.current.bytes += bytes * int64(len(blk.Replicas))
495                 }
496         })
497         for _, srv := range bal.KeepServices {
498                 s.pulls += len(srv.ChangeSet.Pulls)
499                 s.trashes += len(srv.ChangeSet.Trashes)
500         }
501         return
502 }
503
504 // PrintStatistics writes statistics about the computed changes to
505 // bal.Logger. It should not be called until ComputeChangeSets has
506 // finished.
507 func (bal *Balancer) PrintStatistics() {
508         s := bal.getStatistics()
509         bal.logf("===")
510         bal.logf("%s lost (0=have<want)", s.lost)
511         bal.logf("%s underreplicated (0<have<want)", s.underrep)
512         bal.logf("%s just right (have=want)", s.justright)
513         bal.logf("%s overreplicated (have>want>0)", s.overrep)
514         bal.logf("%s unreferenced (have>want=0, new)", s.unref)
515         bal.logf("%s garbage (have>want=0, old)", s.garbage)
516         bal.logf("===")
517         bal.logf("%s total commitment (excluding unreferenced)", s.desired)
518         bal.logf("%s total usage", s.current)
519         bal.logf("===")
520         for _, srv := range bal.KeepServices {
521                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
522         }
523         bal.logf("===")
524 }
525
526 // CheckSanityLate checks for configuration and runtime errors after
527 // GetCurrentState() and ComputeChangeSets() have finished.
528 //
529 // If it returns an error, it is dangerous to run any Commit methods.
530 func (bal *Balancer) CheckSanityLate() error {
531         if bal.errors != nil {
532                 for _, err := range bal.errors {
533                         bal.logf("deferred error: %v", err)
534                 }
535                 return fmt.Errorf("cannot proceed safely after deferred errors")
536         }
537
538         if bal.collScanned == 0 {
539                 return fmt.Errorf("received zero collections")
540         }
541
542         anyDesired := false
543         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
544                 if blk.Desired > 0 {
545                         anyDesired = true
546                 }
547         })
548         if !anyDesired {
549                 return fmt.Errorf("zero blocks have desired replication>0")
550         }
551
552         if dr := bal.DefaultReplication; dr < 1 {
553                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
554         }
555
556         // TODO: no two services have identical indexes
557         // TODO: no collisions (same md5, different size)
558         return nil
559 }
560
561 // CommitPulls sends the computed lists of pull requests to the
562 // keepstore servers. This has the effect of increasing replication of
563 // existing blocks that are either underreplicated or poorly
564 // distributed according to rendezvous hashing.
565 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
566         return bal.commitAsync(c, "send pull list",
567                 func(srv *KeepService) error {
568                         return srv.CommitPulls(c)
569                 })
570 }
571
572 // CommitTrash sends the computed lists of trash requests to the
573 // keepstore servers. This has the effect of deleting blocks that are
574 // overreplicated or unreferenced.
575 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
576         return bal.commitAsync(c, "send trash list",
577                 func(srv *KeepService) error {
578                         return srv.CommitTrash(c)
579                 })
580 }
581
582 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
583         errs := make(chan error)
584         for _, srv := range bal.KeepServices {
585                 go func(srv *KeepService) {
586                         var err error
587                         defer func() { errs <- err }()
588                         label := fmt.Sprintf("%s: %v", srv, label)
589                         defer timeMe(bal.Logger, label)()
590                         err = f(srv)
591                         if err != nil {
592                                 err = fmt.Errorf("%s: %v", label, err)
593                         }
594                 }(srv)
595         }
596         var lastErr error
597         for _ = range bal.KeepServices {
598                 if err := <-errs; err != nil {
599                         bal.logf("%v", err)
600                         lastErr = err
601                 }
602         }
603         close(errs)
604         return lastErr
605 }
606
607 func (bal *Balancer) logf(f string, args ...interface{}) {
608         if bal.Logger != nil {
609                 bal.Logger.Printf(f, args...)
610         }
611 }