Merge branch '2960-keepstore-streaming'
[arvados.git] / services / keepstore / putprogress.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "github.com/sirupsen/logrus"
9 )
10
11 type putProgress struct {
12         classNeeded      map[string]bool
13         classTodo        map[string]bool
14         mountUsed        map[*mount]bool
15         totalReplication int
16         classDone        map[string]int
17 }
18
19 func (pr *putProgress) Add(mnt *mount) {
20         if pr.mountUsed[mnt] {
21                 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
22                 return
23         }
24         pr.mountUsed[mnt] = true
25         pr.totalReplication += mnt.Replication
26         for class := range mnt.StorageClasses {
27                 pr.classDone[class] += mnt.Replication
28                 delete(pr.classTodo, class)
29         }
30 }
31
32 func (pr *putProgress) Sub(mnt *mount) {
33         if !pr.mountUsed[mnt] {
34                 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
35                 return
36         }
37         pr.mountUsed[mnt] = false
38         pr.totalReplication -= mnt.Replication
39         for class := range mnt.StorageClasses {
40                 pr.classDone[class] -= mnt.Replication
41                 if pr.classNeeded[class] {
42                         pr.classTodo[class] = true
43                 }
44         }
45 }
46
47 func (pr *putProgress) Done() bool {
48         return len(pr.classTodo) == 0 && pr.totalReplication > 0
49 }
50
51 func (pr *putProgress) Want(mnt *mount) bool {
52         if pr.Done() || pr.mountUsed[mnt] {
53                 return false
54         }
55         if len(pr.classTodo) == 0 {
56                 // none specified == "any"
57                 return true
58         }
59         for class := range mnt.StorageClasses {
60                 if pr.classTodo[class] {
61                         return true
62                 }
63         }
64         return false
65 }
66
67 func (pr *putProgress) Copy() *putProgress {
68         cp := putProgress{
69                 classNeeded:      pr.classNeeded,
70                 classTodo:        make(map[string]bool, len(pr.classTodo)),
71                 classDone:        make(map[string]int, len(pr.classDone)),
72                 mountUsed:        make(map[*mount]bool, len(pr.mountUsed)),
73                 totalReplication: pr.totalReplication,
74         }
75         for k, v := range pr.classTodo {
76                 cp.classTodo[k] = v
77         }
78         for k, v := range pr.classDone {
79                 cp.classDone[k] = v
80         }
81         for k, v := range pr.mountUsed {
82                 cp.mountUsed[k] = v
83         }
84         return &cp
85 }
86
87 func newPutProgress(classes []string) putProgress {
88         pr := putProgress{
89                 classNeeded: make(map[string]bool, len(classes)),
90                 classTodo:   make(map[string]bool, len(classes)),
91                 classDone:   map[string]int{},
92                 mountUsed:   map[*mount]bool{},
93         }
94         for _, c := range classes {
95                 if c != "" {
96                         pr.classNeeded[c] = true
97                         pr.classTodo[c] = true
98                 }
99         }
100         return pr
101 }