10516: set finished_at to updated_at on pipeline_instances if the pipeline is finishe...
[arvados.git] / services / keep-balance / balance.go
1 package main
2
3 import (
4         "fmt"
5         "log"
6         "math"
7         "os"
8         "runtime"
9         "sort"
10         "strings"
11         "sync"
12         "time"
13
14         "git.curoverse.com/arvados.git/sdk/go/arvados"
15         "git.curoverse.com/arvados.git/sdk/go/keepclient"
16 )
17
18 // CheckConfig returns an error if anything is wrong with the given
19 // config and runOptions.
20 func CheckConfig(config Config, runOptions RunOptions) error {
21         if len(config.KeepServiceList.Items) > 0 && config.KeepServiceTypes != nil {
22                 return fmt.Errorf("cannot specify both KeepServiceList and KeepServiceTypes in config")
23         }
24         if !runOptions.Once && config.RunPeriod == arvados.Duration(0) {
25                 return fmt.Errorf("you must either use the -once flag, or specify RunPeriod in config")
26         }
27         return nil
28 }
29
30 // Balancer compares the contents of keepstore servers with the
31 // collections stored in Arvados, and issues pull/trash requests
32 // needed to get (closer to) the optimal data layout.
33 //
34 // In the optimal data layout: every data block referenced by a
35 // collection is replicated at least as many times as desired by the
36 // collection; there are no unreferenced data blocks older than
37 // BlobSignatureTTL; and all N existing replicas of a given data block
38 // are in the N best positions in rendezvous probe order.
39 type Balancer struct {
40         *BlockStateMap
41         KeepServices       map[string]*KeepService
42         DefaultReplication int
43         Logger             *log.Logger
44         Dumper             *log.Logger
45         MinMtime           int64
46
47         collScanned  int
48         serviceRoots map[string]string
49         errors       []error
50         mutex        sync.Mutex
51 }
52
53 // Run performs a balance operation using the given config and
54 // runOptions, and returns RunOptions suitable for passing to a
55 // subsequent balance operation.
56 //
57 // Run should only be called once on a given Balancer object.
58 //
59 // Typical usage:
60 //
61 //   runOptions, err = (&Balancer{}).Run(config, runOptions)
62 func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
63         nextRunOptions = runOptions
64
65         bal.Dumper = runOptions.Dumper
66         bal.Logger = runOptions.Logger
67         if bal.Logger == nil {
68                 bal.Logger = log.New(os.Stderr, "", log.LstdFlags)
69         }
70
71         defer timeMe(bal.Logger, "Run")()
72
73         if len(config.KeepServiceList.Items) > 0 {
74                 err = bal.SetKeepServices(config.KeepServiceList)
75         } else {
76                 err = bal.DiscoverKeepServices(&config.Client, config.KeepServiceTypes)
77         }
78         if err != nil {
79                 return
80         }
81
82         if err = bal.CheckSanityEarly(&config.Client); err != nil {
83                 return
84         }
85         rs := bal.rendezvousState()
86         if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
87                 if runOptions.SafeRendezvousState != "" {
88                         bal.logf("notice: KeepServices list has changed since last run")
89                 }
90                 bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
91                 if err = bal.ClearTrashLists(&config.Client); err != nil {
92                         return
93                 }
94                 // The current rendezvous state becomes "safe" (i.e.,
95                 // OK to compute changes for that state without
96                 // clearing existing trash lists) only now, after we
97                 // succeed in clearing existing trash lists.
98                 nextRunOptions.SafeRendezvousState = rs
99         }
100         if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
101                 return
102         }
103         bal.ComputeChangeSets()
104         bal.PrintStatistics()
105         if err = bal.CheckSanityLate(); err != nil {
106                 return
107         }
108         if runOptions.CommitPulls {
109                 err = bal.CommitPulls(&config.Client)
110                 if err != nil {
111                         // Skip trash if we can't pull. (Too cautious?)
112                         return
113                 }
114         }
115         if runOptions.CommitTrash {
116                 err = bal.CommitTrash(&config.Client)
117         }
118         return
119 }
120
121 // SetKeepServices sets the list of KeepServices to operate on.
122 func (bal *Balancer) SetKeepServices(srvList arvados.KeepServiceList) error {
123         bal.KeepServices = make(map[string]*KeepService)
124         for _, srv := range srvList.Items {
125                 bal.KeepServices[srv.UUID] = &KeepService{
126                         KeepService: srv,
127                         ChangeSet:   &ChangeSet{},
128                 }
129         }
130         return nil
131 }
132
133 // DiscoverKeepServices sets the list of KeepServices by calling the
134 // API to get a list of all services, and selecting the ones whose
135 // ServiceType is in okTypes.
136 func (bal *Balancer) DiscoverKeepServices(c *arvados.Client, okTypes []string) error {
137         bal.KeepServices = make(map[string]*KeepService)
138         ok := make(map[string]bool)
139         for _, t := range okTypes {
140                 ok[t] = true
141         }
142         return c.EachKeepService(func(srv arvados.KeepService) error {
143                 if ok[srv.ServiceType] {
144                         bal.KeepServices[srv.UUID] = &KeepService{
145                                 KeepService: srv,
146                                 ChangeSet:   &ChangeSet{},
147                         }
148                 } else {
149                         bal.logf("skipping %v with service type %q", srv.UUID, srv.ServiceType)
150                 }
151                 return nil
152         })
153 }
154
155 // CheckSanityEarly checks for configuration and runtime errors that
156 // can be detected before GetCurrentState() and ComputeChangeSets()
157 // are called.
158 //
159 // If it returns an error, it is pointless to run GetCurrentState or
160 // ComputeChangeSets: after doing so, the statistics would be
161 // meaningless and it would be dangerous to run any Commit methods.
162 func (bal *Balancer) CheckSanityEarly(c *arvados.Client) error {
163         u, err := c.CurrentUser()
164         if err != nil {
165                 return fmt.Errorf("CurrentUser(): %v", err)
166         }
167         if !u.IsActive || !u.IsAdmin {
168                 return fmt.Errorf("current user (%s) is not an active admin user", u.UUID)
169         }
170         for _, srv := range bal.KeepServices {
171                 if srv.ServiceType == "proxy" {
172                         return fmt.Errorf("config error: %s: proxy servers cannot be balanced", srv)
173                 }
174         }
175         return nil
176 }
177
178 // rendezvousState returns a fingerprint (e.g., a sorted list of
179 // UUID+host+port) of the current set of keep services.
180 func (bal *Balancer) rendezvousState() string {
181         srvs := make([]string, 0, len(bal.KeepServices))
182         for _, srv := range bal.KeepServices {
183                 srvs = append(srvs, srv.String())
184         }
185         sort.Strings(srvs)
186         return strings.Join(srvs, "; ")
187 }
188
189 // ClearTrashLists sends an empty trash list to each keep
190 // service. Calling this before GetCurrentState avoids races.
191 //
192 // When a block appears in an index, we assume that replica will still
193 // exist after we delete other replicas on other servers. However,
194 // it's possible that a previous rebalancing operation made different
195 // decisions (e.g., servers were added/removed, and rendezvous order
196 // changed). In this case, the replica might already be on that
197 // server's trash list, and it might be deleted before we send a
198 // replacement trash list.
199 //
200 // We avoid this problem if we clear all trash lists before getting
201 // indexes. (We also assume there is only one rebalancing process
202 // running at a time.)
203 func (bal *Balancer) ClearTrashLists(c *arvados.Client) error {
204         for _, srv := range bal.KeepServices {
205                 srv.ChangeSet = &ChangeSet{}
206         }
207         return bal.CommitTrash(c)
208 }
209
210 // GetCurrentState determines the current replication state, and the
211 // desired replication level, for every block that is either
212 // retrievable or referenced.
213 //
214 // It determines the current replication state by reading the block index
215 // from every known Keep service.
216 //
217 // It determines the desired replication level by retrieving all
218 // collection manifests in the database (API server).
219 //
220 // It encodes the resulting information in BlockStateMap.
221 func (bal *Balancer) GetCurrentState(c *arvados.Client, pageSize, bufs int) error {
222         defer timeMe(bal.Logger, "GetCurrentState")()
223         bal.BlockStateMap = NewBlockStateMap()
224
225         dd, err := c.DiscoveryDocument()
226         if err != nil {
227                 return err
228         }
229         bal.DefaultReplication = dd.DefaultCollectionReplication
230         bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
231
232         errs := make(chan error, 2+len(bal.KeepServices))
233         wg := sync.WaitGroup{}
234
235         // Start one goroutine for each KeepService: retrieve the
236         // index, and add the returned blocks to BlockStateMap.
237         for _, srv := range bal.KeepServices {
238                 wg.Add(1)
239                 go func(srv *KeepService) {
240                         defer wg.Done()
241                         bal.logf("%s: retrieve index", srv)
242                         idx, err := srv.Index(c, "")
243                         if err != nil {
244                                 errs <- fmt.Errorf("%s: %v", srv, err)
245                                 return
246                         }
247                         if len(errs) > 0 {
248                                 // Some other goroutine encountered an
249                                 // error -- any further effort here
250                                 // will be wasted.
251                                 return
252                         }
253                         bal.logf("%s: add %d replicas to map", srv, len(idx))
254                         bal.BlockStateMap.AddReplicas(srv, idx)
255                         bal.logf("%s: done", srv)
256                 }(srv)
257         }
258
259         // collQ buffers incoming collections so we can start fetching
260         // the next page without waiting for the current page to
261         // finish processing.
262         collQ := make(chan arvados.Collection, bufs)
263
264         // Start a goroutine to process collections. (We could use a
265         // worker pool here, but even with a single worker we already
266         // process collections much faster than we can retrieve them.)
267         wg.Add(1)
268         go func() {
269                 defer wg.Done()
270                 for coll := range collQ {
271                         err := bal.addCollection(coll)
272                         if err != nil {
273                                 errs <- err
274                                 for range collQ {
275                                 }
276                                 return
277                         }
278                         bal.collScanned++
279                 }
280         }()
281
282         // Start a goroutine to retrieve all collections from the
283         // Arvados database and send them to collQ for processing.
284         wg.Add(1)
285         go func() {
286                 defer wg.Done()
287                 err = EachCollection(c, pageSize,
288                         func(coll arvados.Collection) error {
289                                 collQ <- coll
290                                 if len(errs) > 0 {
291                                         // some other GetCurrentState
292                                         // error happened: no point
293                                         // getting any more
294                                         // collections.
295                                         return fmt.Errorf("")
296                                 }
297                                 return nil
298                         }, func(done, total int) {
299                                 bal.logf("collections: %d/%d", done, total)
300                         })
301                 close(collQ)
302                 if err != nil {
303                         errs <- err
304                 }
305         }()
306
307         wg.Wait()
308         if len(errs) > 0 {
309                 return <-errs
310         }
311         return nil
312 }
313
314 func (bal *Balancer) addCollection(coll arvados.Collection) error {
315         blkids, err := coll.SizedDigests()
316         if err != nil {
317                 bal.mutex.Lock()
318                 bal.errors = append(bal.errors, fmt.Errorf("%v: %v", coll.UUID, err))
319                 bal.mutex.Unlock()
320                 return nil
321         }
322         repl := bal.DefaultReplication
323         if coll.ReplicationDesired != nil {
324                 repl = *coll.ReplicationDesired
325         }
326         debugf("%v: %d block x%d", coll.UUID, len(blkids), repl)
327         bal.BlockStateMap.IncreaseDesired(repl, blkids)
328         return nil
329 }
330
331 // ComputeChangeSets compares, for each known block, the current and
332 // desired replication states. If it is possible to get closer to the
333 // desired state by copying or deleting blocks, it adds those changes
334 // to the relevant KeepServices' ChangeSets.
335 //
336 // It does not actually apply any of the computed changes.
337 func (bal *Balancer) ComputeChangeSets() {
338         // This just calls balanceBlock() once for each block, using a
339         // pool of worker goroutines.
340         defer timeMe(bal.Logger, "ComputeChangeSets")()
341         bal.setupServiceRoots()
342
343         type balanceTask struct {
344                 blkid arvados.SizedDigest
345                 blk   *BlockState
346         }
347         nWorkers := 1 + runtime.NumCPU()
348         todo := make(chan balanceTask, nWorkers)
349         var wg sync.WaitGroup
350         for i := 0; i < nWorkers; i++ {
351                 wg.Add(1)
352                 go func() {
353                         for work := range todo {
354                                 bal.balanceBlock(work.blkid, work.blk)
355                         }
356                         wg.Done()
357                 }()
358         }
359         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
360                 todo <- balanceTask{
361                         blkid: blkid,
362                         blk:   blk,
363                 }
364         })
365         close(todo)
366         wg.Wait()
367 }
368
369 func (bal *Balancer) setupServiceRoots() {
370         bal.serviceRoots = make(map[string]string)
371         for _, srv := range bal.KeepServices {
372                 bal.serviceRoots[srv.UUID] = srv.UUID
373         }
374 }
375
376 const (
377         changeStay = iota
378         changePull
379         changeTrash
380         changeNone
381 )
382
383 var changeName = map[int]string{
384         changeStay:  "stay",
385         changePull:  "pull",
386         changeTrash: "trash",
387         changeNone:  "none",
388 }
389
390 // balanceBlock compares current state to desired state for a single
391 // block, and makes the appropriate ChangeSet calls.
392 func (bal *Balancer) balanceBlock(blkid arvados.SizedDigest, blk *BlockState) {
393         debugf("balanceBlock: %v %+v", blkid, blk)
394         uuids := keepclient.NewRootSorter(bal.serviceRoots, string(blkid[:32])).GetSortedRoots()
395         hasRepl := make(map[string]Replica, len(bal.serviceRoots))
396         for _, repl := range blk.Replicas {
397                 hasRepl[repl.UUID] = repl
398                 // TODO: when multiple copies are on one server, use
399                 // the oldest one that doesn't have a timestamp
400                 // collision with other replicas.
401         }
402         // number of replicas already found in positions better than
403         // the position we're contemplating now.
404         reportedBestRepl := 0
405         // To be safe we assume two replicas with the same Mtime are
406         // in fact the same replica being reported more than
407         // once. len(uniqueBestRepl) is the number of distinct
408         // replicas in the best rendezvous positions we've considered
409         // so far.
410         uniqueBestRepl := make(map[int64]bool, len(bal.serviceRoots))
411         // pulls is the number of Pull changes we have already
412         // requested. (For purposes of deciding whether to Pull to
413         // rendezvous position N, we should assume all pulls we have
414         // requested on rendezvous positions M<N will be successful.)
415         pulls := 0
416         var changes []string
417         for _, uuid := range uuids {
418                 change := changeNone
419                 srv := bal.KeepServices[uuid]
420                 // TODO: request a Touch if Mtime is duplicated.
421                 repl, ok := hasRepl[srv.UUID]
422                 if ok {
423                         // This service has a replica. We should
424                         // delete it if [1] we already have enough
425                         // distinct replicas in better rendezvous
426                         // positions and [2] this replica's Mtime is
427                         // distinct from all of the better replicas'
428                         // Mtimes.
429                         if !srv.ReadOnly &&
430                                 repl.Mtime < bal.MinMtime &&
431                                 len(uniqueBestRepl) >= blk.Desired &&
432                                 !uniqueBestRepl[repl.Mtime] {
433                                 srv.AddTrash(Trash{
434                                         SizedDigest: blkid,
435                                         Mtime:       repl.Mtime,
436                                 })
437                                 change = changeTrash
438                         } else {
439                                 change = changeStay
440                         }
441                         uniqueBestRepl[repl.Mtime] = true
442                         reportedBestRepl++
443                 } else if pulls+reportedBestRepl < blk.Desired &&
444                         len(blk.Replicas) > 0 &&
445                         !srv.ReadOnly {
446                         // This service doesn't have a replica. We
447                         // should pull one to this server if we don't
448                         // already have enough (existing+requested)
449                         // replicas in better rendezvous positions.
450                         srv.AddPull(Pull{
451                                 SizedDigest: blkid,
452                                 Source:      blk.Replicas[0].KeepService,
453                         })
454                         pulls++
455                         change = changePull
456                 }
457                 if bal.Dumper != nil {
458                         changes = append(changes, fmt.Sprintf("%s:%d=%s,%d", srv.ServiceHost, srv.ServicePort, changeName[change], repl.Mtime))
459                 }
460         }
461         if bal.Dumper != nil {
462                 bal.Dumper.Printf("%s have=%d want=%d %s", blkid, len(blk.Replicas), blk.Desired, strings.Join(changes, " "))
463         }
464 }
465
466 type blocksNBytes struct {
467         replicas int
468         blocks   int
469         bytes    int64
470 }
471
472 func (bb blocksNBytes) String() string {
473         return fmt.Sprintf("%d replicas (%d blocks, %d bytes)", bb.replicas, bb.blocks, bb.bytes)
474 }
475
476 type balancerStats struct {
477         lost, overrep, unref, garbage, underrep, justright blocksNBytes
478         desired, current                                   blocksNBytes
479         pulls, trashes                                     int
480         replHistogram                                      []int
481 }
482
483 func (bal *Balancer) getStatistics() (s balancerStats) {
484         s.replHistogram = make([]int, 2)
485         bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
486                 surplus := len(blk.Replicas) - blk.Desired
487                 bytes := blkid.Size()
488                 switch {
489                 case len(blk.Replicas) == 0 && blk.Desired > 0:
490                         s.lost.replicas -= surplus
491                         s.lost.blocks++
492                         s.lost.bytes += bytes * int64(-surplus)
493                 case len(blk.Replicas) < blk.Desired:
494                         s.underrep.replicas -= surplus
495                         s.underrep.blocks++
496                         s.underrep.bytes += bytes * int64(-surplus)
497                 case len(blk.Replicas) > 0 && blk.Desired == 0:
498                         counter := &s.garbage
499                         for _, r := range blk.Replicas {
500                                 if r.Mtime >= bal.MinMtime {
501                                         counter = &s.unref
502                                         break
503                                 }
504                         }
505                         counter.replicas += surplus
506                         counter.blocks++
507                         counter.bytes += bytes * int64(surplus)
508                 case len(blk.Replicas) > blk.Desired:
509                         s.overrep.replicas += surplus
510                         s.overrep.blocks++
511                         s.overrep.bytes += bytes * int64(len(blk.Replicas)-blk.Desired)
512                 default:
513                         s.justright.replicas += blk.Desired
514                         s.justright.blocks++
515                         s.justright.bytes += bytes * int64(blk.Desired)
516                 }
517
518                 if blk.Desired > 0 {
519                         s.desired.replicas += blk.Desired
520                         s.desired.blocks++
521                         s.desired.bytes += bytes * int64(blk.Desired)
522                 }
523                 if len(blk.Replicas) > 0 {
524                         s.current.replicas += len(blk.Replicas)
525                         s.current.blocks++
526                         s.current.bytes += bytes * int64(len(blk.Replicas))
527                 }
528
529                 for len(s.replHistogram) <= len(blk.Replicas) {
530                         s.replHistogram = append(s.replHistogram, 0)
531                 }
532                 s.replHistogram[len(blk.Replicas)]++
533         })
534         for _, srv := range bal.KeepServices {
535                 s.pulls += len(srv.ChangeSet.Pulls)
536                 s.trashes += len(srv.ChangeSet.Trashes)
537         }
538         return
539 }
540
541 // PrintStatistics writes statistics about the computed changes to
542 // bal.Logger. It should not be called until ComputeChangeSets has
543 // finished.
544 func (bal *Balancer) PrintStatistics() {
545         s := bal.getStatistics()
546         bal.logf("===")
547         bal.logf("%s lost (0=have<want)", s.lost)
548         bal.logf("%s underreplicated (0<have<want)", s.underrep)
549         bal.logf("%s just right (have=want)", s.justright)
550         bal.logf("%s overreplicated (have>want>0)", s.overrep)
551         bal.logf("%s unreferenced (have>want=0, new)", s.unref)
552         bal.logf("%s garbage (have>want=0, old)", s.garbage)
553         bal.logf("===")
554         bal.logf("%s total commitment (excluding unreferenced)", s.desired)
555         bal.logf("%s total usage", s.current)
556         bal.logf("===")
557         for _, srv := range bal.KeepServices {
558                 bal.logf("%s: %v\n", srv, srv.ChangeSet)
559         }
560         bal.logf("===")
561         bal.printHistogram(s, 60)
562         bal.logf("===")
563 }
564
565 func (bal *Balancer) printHistogram(s balancerStats, hashColumns int) {
566         bal.logf("Replication level distribution (counting N replicas on a single server as N):")
567         maxCount := 0
568         for _, count := range s.replHistogram {
569                 if maxCount < count {
570                         maxCount = count
571                 }
572         }
573         hashes := strings.Repeat("#", hashColumns)
574         countWidth := 1 + int(math.Log10(float64(maxCount+1)))
575         scaleCount := 10 * float64(hashColumns) / math.Floor(1+10*math.Log10(float64(maxCount+1)))
576         for repl, count := range s.replHistogram {
577                 nHashes := int(scaleCount * math.Log10(float64(count+1)))
578                 bal.logf("%2d: %*d %s", repl, countWidth, count, hashes[:nHashes])
579         }
580 }
581
582 // CheckSanityLate checks for configuration and runtime errors after
583 // GetCurrentState() and ComputeChangeSets() have finished.
584 //
585 // If it returns an error, it is dangerous to run any Commit methods.
586 func (bal *Balancer) CheckSanityLate() error {
587         if bal.errors != nil {
588                 for _, err := range bal.errors {
589                         bal.logf("deferred error: %v", err)
590                 }
591                 return fmt.Errorf("cannot proceed safely after deferred errors")
592         }
593
594         if bal.collScanned == 0 {
595                 return fmt.Errorf("received zero collections")
596         }
597
598         anyDesired := false
599         bal.BlockStateMap.Apply(func(_ arvados.SizedDigest, blk *BlockState) {
600                 if blk.Desired > 0 {
601                         anyDesired = true
602                 }
603         })
604         if !anyDesired {
605                 return fmt.Errorf("zero blocks have desired replication>0")
606         }
607
608         if dr := bal.DefaultReplication; dr < 1 {
609                 return fmt.Errorf("Default replication (%d) is less than 1", dr)
610         }
611
612         // TODO: no two services have identical indexes
613         // TODO: no collisions (same md5, different size)
614         return nil
615 }
616
617 // CommitPulls sends the computed lists of pull requests to the
618 // keepstore servers. This has the effect of increasing replication of
619 // existing blocks that are either underreplicated or poorly
620 // distributed according to rendezvous hashing.
621 func (bal *Balancer) CommitPulls(c *arvados.Client) error {
622         return bal.commitAsync(c, "send pull list",
623                 func(srv *KeepService) error {
624                         return srv.CommitPulls(c)
625                 })
626 }
627
628 // CommitTrash sends the computed lists of trash requests to the
629 // keepstore servers. This has the effect of deleting blocks that are
630 // overreplicated or unreferenced.
631 func (bal *Balancer) CommitTrash(c *arvados.Client) error {
632         return bal.commitAsync(c, "send trash list",
633                 func(srv *KeepService) error {
634                         return srv.CommitTrash(c)
635                 })
636 }
637
638 func (bal *Balancer) commitAsync(c *arvados.Client, label string, f func(srv *KeepService) error) error {
639         errs := make(chan error)
640         for _, srv := range bal.KeepServices {
641                 go func(srv *KeepService) {
642                         var err error
643                         defer func() { errs <- err }()
644                         label := fmt.Sprintf("%s: %v", srv, label)
645                         defer timeMe(bal.Logger, label)()
646                         err = f(srv)
647                         if err != nil {
648                                 err = fmt.Errorf("%s: %v", label, err)
649                         }
650                 }(srv)
651         }
652         var lastErr error
653         for range bal.KeepServices {
654                 if err := <-errs; err != nil {
655                         bal.logf("%v", err)
656                         lastErr = err
657                 }
658         }
659         close(errs)
660         return lastErr
661 }
662
663 func (bal *Balancer) logf(f string, args ...interface{}) {
664         if bal.Logger != nil {
665                 bal.Logger.Printf(f, args...)
666         }
667 }