11002: Merge branch 'master' into 11002-arvput-crash-fix
[arvados.git] / services / keep-balance / block_state.go
1 package main
2
3 import (
4         "sync"
5
6         "git.curoverse.com/arvados.git/sdk/go/arvados"
7 )
8
9 // Replica is a file on disk (or object in an S3 bucket, or blob in an
10 // Azure storage container, etc.) as reported in a keepstore index
11 // response.
12 type Replica struct {
13         *KeepService
14         Mtime int64
15 }
16
17 // BlockState indicates the number of desired replicas (according to
18 // the collections we know about) and the replicas actually stored
19 // (according to the keepstore indexes we know about).
20 type BlockState struct {
21         Replicas []Replica
22         Desired  int
23 }
24
25 func (bs *BlockState) addReplica(r Replica) {
26         bs.Replicas = append(bs.Replicas, r)
27 }
28
29 func (bs *BlockState) increaseDesired(n int) {
30         if bs.Desired < n {
31                 bs.Desired = n
32         }
33 }
34
35 // BlockStateMap is a goroutine-safe wrapper around a
36 // map[arvados.SizedDigest]*BlockState.
37 type BlockStateMap struct {
38         entries map[arvados.SizedDigest]*BlockState
39         mutex   sync.Mutex
40 }
41
42 // NewBlockStateMap returns a newly allocated BlockStateMap.
43 func NewBlockStateMap() *BlockStateMap {
44         return &BlockStateMap{
45                 entries: make(map[arvados.SizedDigest]*BlockState),
46         }
47 }
48
49 // return a BlockState entry, allocating a new one if needed. (Private
50 // method: not goroutine-safe.)
51 func (bsm *BlockStateMap) get(blkid arvados.SizedDigest) *BlockState {
52         // TODO? Allocate BlockState structs a slice at a time,
53         // instead of one at a time.
54         blk := bsm.entries[blkid]
55         if blk == nil {
56                 blk = &BlockState{}
57                 bsm.entries[blkid] = blk
58         }
59         return blk
60 }
61
62 // Apply runs f on each entry in the map.
63 func (bsm *BlockStateMap) Apply(f func(arvados.SizedDigest, *BlockState)) {
64         bsm.mutex.Lock()
65         defer bsm.mutex.Unlock()
66
67         for blkid, blk := range bsm.entries {
68                 f(blkid, blk)
69         }
70 }
71
72 // AddReplicas updates the map to indicate srv has a replica of each
73 // block in idx.
74 func (bsm *BlockStateMap) AddReplicas(srv *KeepService, idx []arvados.KeepServiceIndexEntry) {
75         bsm.mutex.Lock()
76         defer bsm.mutex.Unlock()
77
78         for _, ent := range idx {
79                 bsm.get(ent.SizedDigest).addReplica(Replica{
80                         KeepService: srv,
81                         Mtime:       ent.Mtime,
82                 })
83         }
84 }
85
86 // IncreaseDesired updates the map to indicate the desired replication
87 // for the given blocks is at least n.
88 func (bsm *BlockStateMap) IncreaseDesired(n int, blocks []arvados.SizedDigest) {
89         bsm.mutex.Lock()
90         defer bsm.mutex.Unlock()
91
92         for _, blkid := range blocks {
93                 bsm.get(blkid).increaseDesired(n)
94         }
95 }