13937: Adds tests for operation & I/O counters.
[arvados.git] / services / keepstore / volume_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "context"
10         "crypto/md5"
11         "errors"
12         "fmt"
13         "io"
14         "os"
15         "strings"
16         "sync"
17         "time"
18
19         "github.com/prometheus/client_golang/prometheus"
20 )
21
22 // A TestableVolume allows test suites to manipulate the state of an
23 // underlying Volume, in order to test behavior in cases that are
24 // impractical to achieve with a sequence of normal Volume operations.
25 type TestableVolume interface {
26         Volume
27
28         // Get prometheus metrics
29         GetMetricsVecs() (opsCounters, errCounters, ioBytes *prometheus.CounterVec)
30
31         // [Over]write content for a locator with the given data,
32         // bypassing all constraints like readonly and serialize.
33         PutRaw(locator string, data []byte)
34
35         // Specify the value Mtime() should return, until the next
36         // call to Touch, TouchWithDate, or Put.
37         TouchWithDate(locator string, lastPut time.Time)
38
39         // Clean up, delete temporary files.
40         Teardown()
41 }
42
43 // MockVolumes are test doubles for Volumes, used to test handlers.
44 type MockVolume struct {
45         Store      map[string][]byte
46         Timestamps map[string]time.Time
47
48         // Bad volumes return an error for every operation.
49         Bad            bool
50         BadVolumeError error
51
52         // Touchable volumes' Touch() method succeeds for a locator
53         // that has been Put().
54         Touchable bool
55
56         // Readonly volumes return an error for Put, Delete, and
57         // Touch.
58         Readonly bool
59
60         // Gate is a "starting gate", allowing test cases to pause
61         // volume operations long enough to inspect state. Every
62         // operation (except Status) starts by receiving from
63         // Gate. Sending one value unblocks one operation; closing the
64         // channel unblocks all operations. By default, Gate is a
65         // closed channel, so all operations proceed without
66         // blocking. See trash_worker_test.go for an example.
67         Gate chan struct{}
68
69         called map[string]int
70         mutex  sync.Mutex
71 }
72
73 // CreateMockVolume returns a non-Bad, non-Readonly, Touchable mock
74 // volume.
75 func CreateMockVolume() *MockVolume {
76         gate := make(chan struct{})
77         close(gate)
78         return &MockVolume{
79                 Store:      make(map[string][]byte),
80                 Timestamps: make(map[string]time.Time),
81                 Bad:        false,
82                 Touchable:  true,
83                 Readonly:   false,
84                 called:     map[string]int{},
85                 Gate:       gate,
86         }
87 }
88
89 // CallCount returns how many times the named method has been called.
90 func (v *MockVolume) CallCount(method string) int {
91         v.mutex.Lock()
92         defer v.mutex.Unlock()
93         c, ok := v.called[method]
94         if !ok {
95                 return 0
96         }
97         return c
98 }
99
100 func (v *MockVolume) gotCall(method string) {
101         v.mutex.Lock()
102         defer v.mutex.Unlock()
103         if _, ok := v.called[method]; !ok {
104                 v.called[method] = 1
105         } else {
106                 v.called[method]++
107         }
108 }
109
110 func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
111         v.gotCall("Compare")
112         <-v.Gate
113         if v.Bad {
114                 return v.BadVolumeError
115         } else if block, ok := v.Store[loc]; ok {
116                 if fmt.Sprintf("%x", md5.Sum(block)) != loc {
117                         return DiskHashError
118                 }
119                 if bytes.Compare(buf, block) != 0 {
120                         return CollisionError
121                 }
122                 return nil
123         } else {
124                 return NotFoundError
125         }
126 }
127
128 func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
129         v.gotCall("Get")
130         <-v.Gate
131         if v.Bad {
132                 return 0, v.BadVolumeError
133         } else if block, ok := v.Store[loc]; ok {
134                 copy(buf[:len(block)], block)
135                 return len(block), nil
136         }
137         return 0, os.ErrNotExist
138 }
139
140 func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
141         v.gotCall("Put")
142         <-v.Gate
143         if v.Bad {
144                 return v.BadVolumeError
145         }
146         if v.Readonly {
147                 return MethodDisabledError
148         }
149         v.Store[loc] = block
150         return v.Touch(loc)
151 }
152
153 func (v *MockVolume) Touch(loc string) error {
154         v.gotCall("Touch")
155         <-v.Gate
156         if v.Readonly {
157                 return MethodDisabledError
158         }
159         if v.Touchable {
160                 v.Timestamps[loc] = time.Now()
161                 return nil
162         }
163         return errors.New("Touch failed")
164 }
165
166 func (v *MockVolume) Mtime(loc string) (time.Time, error) {
167         v.gotCall("Mtime")
168         <-v.Gate
169         var mtime time.Time
170         var err error
171         if v.Bad {
172                 err = v.BadVolumeError
173         } else if t, ok := v.Timestamps[loc]; ok {
174                 mtime = t
175         } else {
176                 err = os.ErrNotExist
177         }
178         return mtime, err
179 }
180
181 func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
182         v.gotCall("IndexTo")
183         <-v.Gate
184         for loc, block := range v.Store {
185                 if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
186                         continue
187                 }
188                 _, err := fmt.Fprintf(w, "%s+%d %d\n",
189                         loc, len(block), 123456789)
190                 if err != nil {
191                         return err
192                 }
193         }
194         return nil
195 }
196
197 func (v *MockVolume) Trash(loc string) error {
198         v.gotCall("Delete")
199         <-v.Gate
200         if v.Readonly {
201                 return MethodDisabledError
202         }
203         if _, ok := v.Store[loc]; ok {
204                 if time.Since(v.Timestamps[loc]) < time.Duration(theConfig.BlobSignatureTTL) {
205                         return nil
206                 }
207                 delete(v.Store, loc)
208                 return nil
209         }
210         return os.ErrNotExist
211 }
212
213 func (v *MockVolume) DeviceID() string {
214         return "mock-device-id"
215 }
216
217 func (v *MockVolume) Type() string {
218         return "Mock"
219 }
220
221 func (v *MockVolume) Start(vm *volumeMetricsVecs) error {
222         return nil
223 }
224
225 func (v *MockVolume) Untrash(loc string) error {
226         return nil
227 }
228
229 func (v *MockVolume) Status() *VolumeStatus {
230         var used uint64
231         for _, block := range v.Store {
232                 used = used + uint64(len(block))
233         }
234         return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
235 }
236
237 func (v *MockVolume) String() string {
238         return "[MockVolume]"
239 }
240
241 func (v *MockVolume) Writable() bool {
242         return !v.Readonly
243 }
244
245 func (v *MockVolume) Replication() int {
246         return 1
247 }
248
249 func (v *MockVolume) EmptyTrash() {
250 }
251
252 func (v *MockVolume) GetStorageClasses() []string {
253         return nil
254 }