9187: If a container is reported Queued, but we are monitoring it, stop monitoring it.
[arvados.git] / services / keep-balance / balance.go
1 package main
2
3 import (
4         "fmt"
5         "log"
6         "math"
7         "os"
8         "runtime"
9         "strings"
10         "sync"
11         "time"
12
13         "git.curoverse.com/arvados.git/sdk/go/arvados"
14         "git.curoverse.com/arvados.git/sdk/go/keepclient"
15 )
16
17 // CheckConfig returns an error if anything is wrong with the given
18 // config and runOptions.
19 func CheckConfig(config Config, runOptions RunOptions) error {
20         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
21                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
22         }
23         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
24                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
25         }
26         return nil
27 }
28
29 // Balancer compares the contents of keepstore servers with the
30 // collections stored in Arvados, and issues pull/trash requests
31 // needed to get (closer to) the optimal data layout.
32 //
33 // In the optimal data layout: every data block referenced by a
34 // collection is replicated at least as many times as desired by the
35 // collection; there are no unreferenced data blocks older than
36 // BlobSignatureTTL; and all N existing replicas of a given data block
37 // are in the N best positions in rendezvous probe order.
38 type Balancer struct {
39         *BlockStateMap
40         KeepServices       map[string]*KeepService
41         DefaultReplication int
42         Logger             *log.Logger
43         Dumper             *log.Logger
44         MinMtime           int64
45
46         collScanned  int
47         serviceRoots map[string]string
48         errors       []error
49         mutex        sync.Mutex
50 }
51
52 // Run performs a balance operation using the given config and
53 // runOptions. It should only be called once on a given Balancer
54 // object. Typical usage:
55 //
56 //   err = (&Balancer{}).Run(config, runOptions)
57 func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
58         bal.Dumper = runOptions.Dumper
59         bal.Logger = runOptions.Logger
60         if bal.Logger == nil {
61                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
62         }
63
64         defer timeMe(bal.Logger, "Run")()
65
66         if len(config.KeepServiceList.Items) > 0 {
67                 err = bal.SetKeepServices(config.KeepServiceList)
68         } else {
69                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
70         }
71         if err != nil {
72                 return
73         }
74
75         if err = bal.CheckSanityEarly(&config.Client); err != nil {
76                 return
77         }
78         if runOptions.CommitTrash {
79                 if err = bal.ClearTrashLists(&config.Client); err != nil {
80                         return
81                 }
82         }
83         if err = bal.GetCurrentState(&config.Client); err != nil {
84                 return
85         }
86         bal.ComputeChangeSets()
87         bal.PrintStatistics()
88         if err = bal.CheckSanityLate(); err != nil {
89                 return
90         }
91         if runOptions.CommitPulls {
92                 err = bal.CommitPulls(&config.Client)
93                 if err != nil {
94                         // Skip trash if we can't pull. (Too cautious?)
95                         return
96                 }
97         }
98         if runOptions.CommitTrash {
99                 err = bal.CommitTrash(&config.Client)
100         }
101         return
102 }
103
104 // SetKeepServices sets the list of KeepServices to operate on.
105 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
106         bal.KeepServices = make(map[string]*KeepService)
107         for _, srv := range srvList.Items {
108                 bal.KeepServices[srv.UUID] = &KeepService{
109                         KeepService: srv,
110                         ChangeSet:   &ChangeSet{},
111                 }
112         }
113         return nil
114 }
115
116 // DiscoverKeepServices sets the list of KeepServices by calling the
117 // API to get a list of all services, and selecting the ones whose
118 // ServiceType is in okTypes.
119 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
120         bal.KeepServices = make(map[string]*KeepService)
121         ok := make(map[string]bool)
122         for _, t := range okTypes {
123                 ok[t] = true
124         }
125         return c.EachKeepService(func(srv arvados.KeepService) error {
126                 if ok[srv.ServiceType] {
127                         bal.KeepServices[srv.UUID] = &KeepService{
128                                 KeepService: srv,
129                                 ChangeSet:   &ChangeSet{},
130                         }
131                 } else {
132                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
133                 }
134                 return nil
135         })
136 }
137
138 // CheckSanityEarly checks for configuration and runtime errors that
139 // can be detected before GetCurrentState() and ComputeChangeSets()
140 // are called.
141 //
142 // If it returns an error, it is pointless to run GetCurrentState or
143 // ComputeChangeSets: after doing so, the statistics would be
144 // meaningless and it would be dangerous to run any Commit methods.
145 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
146         u, err := c.CurrentUser()
147         if err != nil {
148                 return fmt.Errorf("CurrentUser(): %v", err)
149         }
150         if !u.IsActive || !u.IsAdmin {
151                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
152         }
153         for _, srv := range bal.KeepServices {
154                 if srv.ServiceType == "proxy" {
155                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
156                 }
157         }
158         return nil
159 }
160
161 // ClearTrashLists sends an empty trash list to each keep
162 // service. Calling this before GetCurrentState avoids races.
163 //
164 // When a block appears in an index, we assume that replica will still
165 // exist after we delete other replicas on other servers. However,
166 // it's possible that a previous rebalancing operation made different
167 // decisions (e.g., servers were added/removed, and rendezvous order
168 // changed). In this case, the replica might already be on that
169 // server's trash list, and it might be deleted before we send a
170 // replacement trash list.
171 //
172 // We avoid this problem if we clear all trash lists before getting
173 // indexes. (We also assume there is only one rebalancing process
174 // running at a time.)
175 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
176         for _, srv := range bal.KeepServices {
177                 srv.ChangeSet = &ChangeSet{}
178         }
179         return bal.CommitTrash(c)
180 }
181
182 // GetCurrentState determines the current replication state, and the
183 // desired replication level, for every block that is either
184 // retrievable or referenced.
185 //
186 // It determines the current replication state by reading the block index
187 // from every known Keep service.
188 //
189 // It determines the desired replication level by retrieving all
190 // collection manifests in the database (API server).
191 //
192 // It encodes the resulting information in BlockStateMap.
193 func (bal *Balancer) GetCurrentState(c *arvados.Client) error {
194         defer timeMe(bal.Logger, "GetCurrentState")()
195         bal.BlockStateMap = NewBlockStateMap()
196
197         dd, err := c.DiscoveryDocument()
198         if err != nil {
199                 return err
200         }
201         bal.DefaultReplication = dd.DefaultCollectionReplication
202         bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
203
204         errs := make(chan error, 2+len(bal.KeepServices))
205         wg := sync.WaitGroup{}
206
207         // Start one goroutine for each KeepService: retrieve the
208         // index, and add the returned blocks to BlockStateMap.
209         for _, srv := range bal.KeepServices {
210                 wg.Add(1)
211                 go func(srv *KeepService) {
212                         defer wg.Done()
213                         bal.logf("%s: retrieve index", srv)
214                         idx, err := srv.Index(c, "")
215                         if err != nil {
216                                 errs <- fmt.Errorf("%s: %v", srv, err)
217                                 return
218                         }
219                         bal.logf("%s: add %d replicas to map", srv, len(idx))
220                         bal.BlockStateMap.AddReplicas(srv, idx)
221                         bal.logf("%s: done", srv)
222                 }(srv)
223         }
224
225         // collQ buffers incoming collections so we can start fetching
226         // the next page without waiting for the current page to
227         // finish processing. (1000 happens to match the page size
228         // used by (*arvados.Client)EachCollection(), but it's OK if
229         // they don't match.)
230         collQ := make(chan arvados.Collection, 1000)
231
232         // Start a goroutine to process collections. (We could use a
233         // worker pool here, but even with a single worker we already
234         // process collections much faster than we can retrieve them.)
235         wg.Add(1)
236         go func() {
237                 defer wg.Done()
238                 for coll := range collQ {
239                         err := bal.addCollection(coll)
240                         if err != nil {
241                                 errs <- err
242                                 for range collQ {
243                                 }
244                                 return
245                         }
246                         bal.collScanned++
247                 }
248         }()
249
250         // Start a goroutine to retrieve all collections from the
251         // Arvados database and send them to collQ for processing.
252         wg.Add(1)
253         go func() {
254                 defer wg.Done()
255                 err = EachCollection(c,
256                         func(coll arvados.Collection) error {
257                                 collQ <- coll
258                                 if len(errs) > 0 {
259                                         // some other GetCurrentState
260                                         // error happened: no point
261                                         // getting any more
262                                         // collections.
263                                         return fmt.Errorf("")
264                                 }
265                                 return nil
266                         }, func(done, total int) {
267                                 bal.logf("collections: %d/%d", done, total)
268                         })
269                 close(collQ)
270                 if err != nil {
271                         errs <- err
272                 }
273         }()
274
275         go func() {
276                 // Send a nil error when all goroutines finish. If
277                 // this is the first error sent to errs, then
278                 // everything worked.
279                 wg.Wait()
280                 errs <- nil
281         }()
282         return <-errs
283 }
284
285 func (bal *Balancer) addCollection(coll arvados.Collection) error {
286         blkids, err := coll.SizedDigests()
287         if err != nil {
288                 bal.mutex.Lock()
289                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
290                 bal.mutex.Unlock()
291                 return nil
292         }
293         repl := bal.DefaultReplication
294         if coll.ReplicationDesired != nil {
295                 repl = *coll.ReplicationDesired
296         }
297         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
298         bal.BlockStateMap.IncreaseDesired(repl, blkids)
299         return nil
300 }
301
302 // ComputeChangeSets compares, for each known block, the current and
303 // desired replication states. If it is possible to get closer to the
304 // desired state by copying or deleting blocks, it adds those changes
305 // to the relevant KeepServices' ChangeSets.
306 //
307 // It does not actually apply any of the computed changes.
308 func (bal *Balancer) ComputeChangeSets() {
309         // This just calls balanceBlock() once for each block, using a
310         // pool of worker goroutines.
311         defer timeMe(bal.Logger, "ComputeChangeSets")()
312         bal.setupServiceRoots()
313
314         type balanceTask struct {
315                 blkid arvados.SizedDigest
316                 blk   *BlockState
317         }
318         nWorkers := 1 + runtime.NumCPU()
319         todo := make(chan balanceTask, nWorkers)
320         var wg sync.WaitGroup
321         for i := 0; i < nWorkers; i++ {
322                 wg.Add(1)
323                 go func() {
324                         for work := range todo {
325                                 bal.balanceBlock(work.blkid, work.blk)
326                         }
327                         wg.Done()
328                 }()
329         }
330         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
331                 todo <- balanceTask{
332                         blkid: blkid,
333                         blk:   blk,
334                 }
335         })
336         close(todo)
337         wg.Wait()
338 }
339
340 func (bal *Balancer) setupServiceRoots() {
341         bal.serviceRoots = make(map[string]string)
342         for _, srv := range bal.KeepServices {
343                 bal.serviceRoots[srv.UUID] = srv.UUID
344         }
345 }
346
347 const (
348         changeStay = iota
349         changePull
350         changeTrash
351         changeNone
352 )
353
354 var changeName = map[int]string{
355         changeStay:  "stay",
356         changePull:  "pull",
357         changeTrash: "trash",
358         changeNone:  "none",
359 }
360
361 // balanceBlock compares current state to desired state for a single
362 // block, and makes the appropriate ChangeSet calls.
363 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
364         debugf("balanceBlock: %v %+v", blkid, blk)
365         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
366         hasRepl := make(map[string]Replica, len(bal.serviceRoots))
367         for _, repl := range blk.Replicas {
368                 hasRepl[repl.UUID] = repl
369                 // TODO: when multiple copies are on one server, use
370                 // the oldest one that doesn't have a timestamp
371                 // collision with other replicas.
372         }
373         // number of replicas already found in positions better than
374         // the position we're contemplating now.
375         reportedBestRepl := 0
376         // To be safe we assume two replicas with the same Mtime are
377         // in fact the same replica being reported more than
378         // once. len(uniqueBestRepl) is the number of distinct
379         // replicas in the best rendezvous positions we've considered
380         // so far.
381         uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
382         // pulls is the number of Pull changes we have already
383         // requested. (For purposes of deciding whether to Pull to
384         // rendezvous position N, we should assume all pulls we have
385         // requested on rendezvous positions M<N will be successful.)
386         pulls := 0
387         var changes []string
388         for _, uuid := range uuids {
389                 change := changeNone
390                 srv := bal.KeepServices[uuid]
391                 // TODO: request a Touch if Mtime is duplicated.
392                 repl, ok := hasRepl[srv.UUID]
393                 if ok {
394                         // This service has a replica. We should
395                         // delete it if [1] we already have enough
396                         // distinct replicas in better rendezvous
397                         // positions and [2] this replica's Mtime is
398                         // distinct from all of the better replicas'
399                         // Mtimes.
400                         if !srv.ReadOnly &&
401                                 repl.Mtime < bal.MinMtime &&
402                                 len(uniqueBestRepl) >= blk.Desired &&
403                                 !uniqueBestRepl[repl.Mtime] {
404                                 srv.AddTrash(Trash{
405                                         SizedDigest: blkid,
406                                         Mtime:       repl.Mtime,
407                                 })
408                                 change = changeTrash
409                         } else {
410                                 change = changeStay
411                         }
412                         uniqueBestRepl[repl.Mtime] = true
413                         reportedBestRepl++
414                 } else if pulls+reportedBestRepl < blk.Desired &&
415                         len(blk.Replicas) > 0 &&
416                         !srv.ReadOnly {
417                         // This service doesn't have a replica. We
418                         // should pull one to this server if we don't
419                         // already have enough (existing+requested)
420                         // replicas in better rendezvous positions.
421                         srv.AddPull(Pull{
422                                 SizedDigest: blkid,
423                                 Source:      blk.Replicas[0].KeepService,
424                         })
425                         pulls++
426                         change = changePull
427                 }
428                 if bal.Dumper != nil {
429                         changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
430                 }
431         }
432         if bal.Dumper != nil {
433                 bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
434         }
435 }
436
437 type blocksNBytes struct {
438         replicas int
439         blocks   int
440         bytes    int64
441 }
442
443 func (bb blocksNBytes) String() string {
444         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
445 }
446
447 type balancerStats struct {
448         lost, overrep, unref, garbage, underrep, justright blocksNBytes
449         desired, current                                   blocksNBytes
450         pulls, trashes                                     int
451         replHistogram                                      []int
452 }
453
454 func (bal *Balancer) getStatistics() (s balancerStats) {
455         s.replHistogram = make([]int, 2)
456         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
457                 surplus := len(blk.Replicas) - blk.Desired
458                 bytes := blkid.Size()
459                 switch {
460                 case len(blk.Replicas) == 0 && blk.Desired > 0:
461                         s.lost.replicas -= surplus
462                         s.lost.blocks++
463                         s.lost.bytes += bytes * int64(-surplus)
464                 case len(blk.Replicas) < blk.Desired:
465                         s.underrep.replicas -= surplus
466                         s.underrep.blocks++
467                         s.underrep.bytes += bytes * int64(-surplus)
468                 case len(blk.Replicas) > 0 && blk.Desired == 0:
469                         counter := &s.garbage
470                         for _, r := range blk.Replicas {
471                                 if r.Mtime >= bal.MinMtime {
472                                         counter = &s.unref
473                                         break
474                                 }
475                         }
476                         counter.replicas += surplus
477                         counter.blocks++
478                         counter.bytes += bytes * int64(surplus)
479                 case len(blk.Replicas) > blk.Desired:
480                         s.overrep.replicas += surplus
481                         s.overrep.blocks++
482                         s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
483                 default:
484                         s.justright.replicas += blk.Desired
485                         s.justright.blocks++
486                         s.justright.bytes += bytes * int64(blk.Desired)
487                 }
488
489                 if blk.Desired > 0 {
490                         s.desired.replicas += blk.Desired
491                         s.desired.blocks++
492                         s.desired.bytes += bytes * int64(blk.Desired)
493                 }
494                 if len(blk.Replicas) > 0 {
495                         s.current.replicas += len(blk.Replicas)
496                         s.current.blocks++
497                         s.current.bytes += bytes * int64(len(blk.Replicas))
498                 }
499
500                 for len(s.replHistogram) <= len(blk.Replicas) {
501                         s.replHistogram = append(s.replHistogram, 0)
502                 }
503                 s.replHistogram[len(blk.Replicas)]++
504         })
505         for _, srv := range bal.KeepServices {
506                 s.pulls += len(srv.ChangeSet.Pulls)
507                 s.trashes += len(srv.ChangeSet.Trashes)
508         }
509         return
510 }
511
512 // PrintStatistics writes statistics about the computed changes to
513 // bal.Logger. It should not be called until ComputeChangeSets has
514 // finished.
515 func (bal *Balancer) PrintStatistics() {
516         s := bal.getStatistics()
517         bal.logf("===")
518         bal.logf("%s lost (0=have<want)", s.lost)
519         bal.logf("%s underreplicated (0<have<want)", s.underrep)
520         bal.logf("%s just right (have=want)", s.justright)
521         bal.logf("%s overreplicated (have>want>0)", s.overrep)
522         bal.logf("%s unreferenced (have>want=0, new)", s.unref)
523         bal.logf("%s garbage (have>want=0, old)", s.garbage)
524         bal.logf("===")
525         bal.logf("%s total commitment (excluding unreferenced)", s.desired)
526         bal.logf("%s total usage", s.current)
527         bal.logf("===")
528         for _, srv := range bal.KeepServices {
529                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
530         }
531         bal.logf("===")
532         bal.printHistogram(s, 60)
533         bal.logf("===")
534 }
535
536 func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
537         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
538         maxCount := 0
539         for _, count := range s.replHistogram {
540                 if maxCount < count {
541                         maxCount = count
542                 }
543         }
544         hashes := strings.Repeat("#", hashColumns)
545         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
546         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
547         for repl, count := range s.replHistogram {
548                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
549                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
550         }
551 }
552
553 // CheckSanityLate checks for configuration and runtime errors after
554 // GetCurrentState() and ComputeChangeSets() have finished.
555 //
556 // If it returns an error, it is dangerous to run any Commit methods.
557 func (bal *Balancer) CheckSanityLate() error {
558         if bal.errors != nil {
559                 for _, err := range bal.errors {
560                         bal.logf("deferred error: %v", err)
561                 }
562                 return fmt.Errorf("cannot proceed safely after deferred errors")
563         }
564
565         if bal.collScanned == 0 {
566                 return fmt.Errorf("received zero collections")
567         }
568
569         anyDesired := false
570         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
571                 if blk.Desired > 0 {
572                         anyDesired = true
573                 }
574         })
575         if !anyDesired {
576                 return fmt.Errorf("zero blocks have desired replication>0")
577         }
578
579         if dr := bal.DefaultReplication; dr < 1 {
580                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
581         }
582
583         // TODO: no two services have identical indexes
584         // TODO: no collisions (same md5, different size)
585         return nil
586 }
587
588 // CommitPulls sends the computed lists of pull requests to the
589 // keepstore servers. This has the effect of increasing replication of
590 // existing blocks that are either underreplicated or poorly
591 // distributed according to rendezvous hashing.
592 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
593         return bal.commitAsync(c, "send pull list",
594                 func(srv *KeepService) error {
595                         return srv.CommitPulls(c)
596                 })
597 }
598
599 // CommitTrash sends the computed lists of trash requests to the
600 // keepstore servers. This has the effect of deleting blocks that are
601 // overreplicated or unreferenced.
602 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
603         return bal.commitAsync(c, "send trash list",
604                 func(srv *KeepService) error {
605                         return srv.CommitTrash(c)
606                 })
607 }
608
609 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
610         errs := make(chan error)
611         for _, srv := range bal.KeepServices {
612                 go func(srv *KeepService) {
613                         var err error
614                         defer func() { errs <- err }()
615                         label := fmt.Sprintf("%s: %v", srv, label)
616                         defer timeMe(bal.Logger, label)()
617                         err = f(srv)
618                         if err != nil {
619                                 err = fmt.Errorf("%s: %v", label, err)
620                         }
621                 }(srv)
622         }
623         var lastErr error
624         for _ = range bal.KeepServices {
625                 if err := <-errs; err != nil {
626                         bal.logf("%v", err)
627                         lastErr = err
628                 }
629         }
630         close(errs)
631         return lastErr
632 }
633
634 func (bal *Balancer) logf(f string, args ...interface{}) {
635         if bal.Logger != nil {
636                 bal.Logger.Printf(f, args...)
637         }
638 }