Merge branch '19688-cwl-fast-path' refs #19688
[arvados.git] / services / keepstore / volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "context"
9         "crypto/rand"
10         "fmt"
11         "io"
12         "math/big"
13         "sort"
14         "sync/atomic"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "github.com/sirupsen/logrus"
19 )
20
21 type BlockWriter interface {
22         // WriteBlock reads all data from r, writes it to a backing
23         // store as "loc", and returns the number of bytes written.
24         WriteBlock(ctx context.Context, loc string, r io.Reader) error
25 }
26
27 type BlockReader interface {
28         // ReadBlock retrieves data previously stored as "loc" and
29         // writes it to w.
30         ReadBlock(ctx context.Context, loc string, w io.Writer) error
31 }
32
33 var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
34
35 // A Volume is an interface representing a Keep back-end storage unit:
36 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
37 // etc.
38 type Volume interface {
39         // Get a block: copy the block data into buf, and return the
40         // number of bytes copied.
41         //
42         // loc is guaranteed to consist of 32 or more lowercase hex
43         // digits.
44         //
45         // Get should not verify the integrity of the data: it should
46         // just return whatever was found in its backing
47         // store. (Integrity checking is the caller's responsibility.)
48         //
49         // If an error is encountered that prevents it from
50         // retrieving the data, that error should be returned so the
51         // caller can log (and send to the client) a more useful
52         // message.
53         //
54         // If the error is "not found", and there's no particular
55         // reason to expect the block to be found (other than that a
56         // caller is asking for it), the returned error should satisfy
57         // os.IsNotExist(err): this is a normal condition and will not
58         // be logged as an error (except that a 404 will appear in the
59         // access log if the block is not found on any other volumes
60         // either).
61         //
62         // If the data in the backing store is bigger than len(buf),
63         // then Get is permitted to return an error without reading
64         // any of the data.
65         //
66         // len(buf) will not exceed BlockSize.
67         Get(ctx context.Context, loc string, buf []byte) (int, error)
68
69         // Compare the given data with the stored data (i.e., what Get
70         // would return). If equal, return nil. If not, return
71         // CollisionError or DiskHashError (depending on whether the
72         // data on disk matches the expected hash), or whatever error
73         // was encountered opening/reading the stored data.
74         Compare(ctx context.Context, loc string, data []byte) error
75
76         // Put writes a block to an underlying storage device.
77         //
78         // loc is as described in Get.
79         //
80         // len(block) is guaranteed to be between 0 and BlockSize.
81         //
82         // If a block is already stored under the same name (loc) with
83         // different content, Put must either overwrite the existing
84         // data with the new data or return a non-nil error. When
85         // overwriting existing data, it must never leave the storage
86         // device in an inconsistent state: a subsequent call to Get
87         // must return either the entire old block, the entire new
88         // block, or an error. (An implementation that cannot peform
89         // atomic updates must leave the old data alone and return an
90         // error.)
91         //
92         // Put also sets the timestamp for the given locator to the
93         // current time.
94         //
95         // Put must return a non-nil error unless it can guarantee
96         // that the entire block has been written and flushed to
97         // persistent storage, and that its timestamp is current. Of
98         // course, this guarantee is only as good as the underlying
99         // storage device, but it is Put's responsibility to at least
100         // get whatever guarantee is offered by the storage device.
101         //
102         // Put should not verify that loc==hash(block): this is the
103         // caller's responsibility.
104         Put(ctx context.Context, loc string, block []byte) error
105
106         // Touch sets the timestamp for the given locator to the
107         // current time.
108         //
109         // loc is as described in Get.
110         //
111         // If invoked at time t0, Touch must guarantee that a
112         // subsequent call to Mtime will return a timestamp no older
113         // than {t0 minus one second}. For example, if Touch is called
114         // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
115         // subsequent Mtime to return any of the following:
116         //
117         //   - 2015-07-07T01:23:45.00000000Z
118         //   - 2015-07-07T01:23:45.67890123Z
119         //   - 2015-07-07T01:23:46.67890123Z
120         //   - 2015-07-08T00:00:00.00000000Z
121         //
122         // It is not acceptable for a subsequente Mtime to return
123         // either of the following:
124         //
125         //   - 2015-07-07T00:00:00.00000000Z -- ERROR
126         //   - 2015-07-07T01:23:44.00000000Z -- ERROR
127         //
128         // Touch must return a non-nil error if the timestamp cannot
129         // be updated.
130         Touch(loc string) error
131
132         // Mtime returns the stored timestamp for the given locator.
133         //
134         // loc is as described in Get.
135         //
136         // Mtime must return a non-nil error if the given block is not
137         // found or the timestamp could not be retrieved.
138         Mtime(loc string) (time.Time, error)
139
140         // IndexTo writes a complete list of locators with the given
141         // prefix for which Get() can retrieve data.
142         //
143         // prefix consists of zero or more lowercase hexadecimal
144         // digits.
145         //
146         // Each locator must be written to the given writer using the
147         // following format:
148         //
149         //   loc "+" size " " timestamp "\n"
150         //
151         // where:
152         //
153         //   - size is the number of bytes of content, given as a
154         //     decimal number with one or more digits
155         //
156         //   - timestamp is the timestamp stored for the locator,
157         //     given as a decimal number of seconds after January 1,
158         //     1970 UTC.
159         //
160         // IndexTo must not write any other data to writer: for
161         // example, it must not write any blank lines.
162         //
163         // If an error makes it impossible to provide a complete
164         // index, IndexTo must return a non-nil error. It is
165         // acceptable to return a non-nil error after writing a
166         // partial index to writer.
167         //
168         // The resulting index is not expected to be sorted in any
169         // particular order.
170         IndexTo(prefix string, writer io.Writer) error
171
172         // Trash moves the block data from the underlying storage
173         // device to trash area. The block then stays in trash for
174         // BlobTrashLifetime before it is actually deleted.
175         //
176         // loc is as described in Get.
177         //
178         // If the timestamp for the given locator is newer than
179         // BlobSigningTTL, Trash must not trash the data.
180         //
181         // If a Trash operation overlaps with any Touch or Put
182         // operations on the same locator, the implementation must
183         // ensure one of the following outcomes:
184         //
185         //   - Touch and Put return a non-nil error, or
186         //   - Trash does not trash the block, or
187         //   - Both of the above.
188         //
189         // If it is possible for the storage device to be accessed by
190         // a different process or host, the synchronization mechanism
191         // should also guard against races with other processes and
192         // hosts. If such a mechanism is not available, there must be
193         // a mechanism for detecting unsafe configurations, alerting
194         // the operator, and aborting or falling back to a read-only
195         // state. In other words, running multiple keepstore processes
196         // with the same underlying storage device must either work
197         // reliably or fail outright.
198         //
199         // Corollary: A successful Touch or Put guarantees a block
200         // will not be trashed for at least BlobSigningTTL seconds.
201         Trash(loc string) error
202
203         // Untrash moves block from trash back into store
204         Untrash(loc string) error
205
206         // Status returns a *VolumeStatus representing the current
207         // in-use and available storage capacity and an
208         // implementation-specific volume identifier (e.g., "mount
209         // point" for a UnixVolume).
210         Status() *VolumeStatus
211
212         // String returns an identifying label for this volume,
213         // suitable for including in log messages. It should contain
214         // enough information to uniquely identify the underlying
215         // storage device, but should not contain any credentials or
216         // secrets.
217         String() string
218
219         // EmptyTrash looks for trashed blocks that exceeded
220         // BlobTrashLifetime and deletes them from the volume.
221         EmptyTrash()
222
223         // Return a globally unique ID of the underlying storage
224         // device if possible, otherwise "".
225         GetDeviceID() string
226 }
227
228 // A VolumeWithExamples provides example configs to display in the
229 // -help message.
230 type VolumeWithExamples interface {
231         Volume
232         Examples() []Volume
233 }
234
235 // A VolumeManager tells callers which volumes can read, which volumes
236 // can write, and on which volume the next write should be attempted.
237 type VolumeManager interface {
238         // Mounts returns all mounts (volume attachments).
239         Mounts() []*VolumeMount
240
241         // Lookup returns the mount with the given UUID. Returns nil
242         // if the mount does not exist. If write==true, returns nil if
243         // the mount is not writable.
244         Lookup(uuid string, write bool) *VolumeMount
245
246         // AllReadable returns all mounts.
247         AllReadable() []*VolumeMount
248
249         // AllWritable returns all mounts that aren't known to be in
250         // a read-only state. (There is no guarantee that a write to
251         // one will succeed, though.)
252         AllWritable() []*VolumeMount
253
254         // NextWritable returns the volume where the next new block
255         // should be written. A VolumeManager can select a volume in
256         // order to distribute activity across spindles, fill up disks
257         // with more free space, etc.
258         NextWritable() *VolumeMount
259
260         // VolumeStats returns the ioStats used for tracking stats for
261         // the given Volume.
262         VolumeStats(Volume) *ioStats
263
264         // Close shuts down the volume manager cleanly.
265         Close()
266 }
267
268 // A VolumeMount is an attachment of a Volume to a VolumeManager.
269 type VolumeMount struct {
270         arvados.KeepMount
271         Volume
272 }
273
274 // Generate a UUID the way API server would for a "KeepVolumeMount"
275 // object.
276 func (*VolumeMount) generateUUID() string {
277         var max big.Int
278         _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
279         if !ok {
280                 panic("big.Int parse failed")
281         }
282         r, err := rand.Int(rand.Reader, &max)
283         if err != nil {
284                 panic(err)
285         }
286         return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
287 }
288
289 // RRVolumeManager is a round-robin VolumeManager: the Nth call to
290 // NextWritable returns the (N % len(writables))th writable Volume
291 // (where writables are all Volumes v where v.Writable()==true).
292 type RRVolumeManager struct {
293         mounts    []*VolumeMount
294         mountMap  map[string]*VolumeMount
295         readables []*VolumeMount
296         writables []*VolumeMount
297         counter   uint32
298         iostats   map[Volume]*ioStats
299 }
300
301 func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
302         vm := &RRVolumeManager{
303                 iostats: make(map[Volume]*ioStats),
304         }
305         vm.mountMap = make(map[string]*VolumeMount)
306         for uuid, cfgvol := range cluster.Volumes {
307                 va, ok := cfgvol.AccessViaHosts[myURL]
308                 if !ok && len(cfgvol.AccessViaHosts) > 0 {
309                         continue
310                 }
311                 dri, ok := driver[cfgvol.Driver]
312                 if !ok {
313                         return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
314                 }
315                 vol, err := dri(cluster, cfgvol, logger, metrics)
316                 if err != nil {
317                         return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
318                 }
319                 logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly || va.ReadOnly)
320
321                 sc := cfgvol.StorageClasses
322                 if len(sc) == 0 {
323                         sc = map[string]bool{"default": true}
324                 }
325                 repl := cfgvol.Replication
326                 if repl < 1 {
327                         repl = 1
328                 }
329                 mnt := &VolumeMount{
330                         KeepMount: arvados.KeepMount{
331                                 UUID:           uuid,
332                                 DeviceID:       vol.GetDeviceID(),
333                                 ReadOnly:       cfgvol.ReadOnly || va.ReadOnly,
334                                 Replication:    repl,
335                                 StorageClasses: sc,
336                         },
337                         Volume: vol,
338                 }
339                 vm.iostats[vol] = &ioStats{}
340                 vm.mounts = append(vm.mounts, mnt)
341                 vm.mountMap[uuid] = mnt
342                 vm.readables = append(vm.readables, mnt)
343                 if !mnt.KeepMount.ReadOnly {
344                         vm.writables = append(vm.writables, mnt)
345                 }
346         }
347         // pri(mnt): return highest priority of any storage class
348         // offered by mnt
349         pri := func(mnt *VolumeMount) int {
350                 any, best := false, 0
351                 for class := range mnt.KeepMount.StorageClasses {
352                         if p := cluster.StorageClasses[class].Priority; !any || best < p {
353                                 best = p
354                                 any = true
355                         }
356                 }
357                 return best
358         }
359         // less(a,b): sort first by highest priority of any offered
360         // storage class (highest->lowest), then by volume UUID
361         less := func(a, b *VolumeMount) bool {
362                 if pa, pb := pri(a), pri(b); pa != pb {
363                         return pa > pb
364                 } else {
365                         return a.KeepMount.UUID < b.KeepMount.UUID
366                 }
367         }
368         sort.Slice(vm.readables, func(i, j int) bool {
369                 return less(vm.readables[i], vm.readables[j])
370         })
371         sort.Slice(vm.writables, func(i, j int) bool {
372                 return less(vm.writables[i], vm.writables[j])
373         })
374         sort.Slice(vm.mounts, func(i, j int) bool {
375                 return less(vm.mounts[i], vm.mounts[j])
376         })
377         return vm, nil
378 }
379
380 func (vm *RRVolumeManager) Mounts() []*VolumeMount {
381         return vm.mounts
382 }
383
384 func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
385         if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
386                 return mnt
387         }
388         return nil
389 }
390
391 // AllReadable returns an array of all readable volumes
392 func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
393         return vm.readables
394 }
395
396 // AllWritable returns writable volumes, sorted by priority/uuid. Used
397 // by CompareAndTouch to ensure higher-priority volumes are checked
398 // first.
399 func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
400         return vm.writables
401 }
402
403 // NextWritable returns writable volumes, rotated by vm.counter so
404 // each volume gets a turn to be first. Used by PutBlock to distribute
405 // new data across available volumes.
406 func (vm *RRVolumeManager) NextWritable() []*VolumeMount {
407         if len(vm.writables) == 0 {
408                 return nil
409         }
410         offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables)
411         return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...)
412 }
413
414 // VolumeStats returns an ioStats for the given volume.
415 func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
416         return vm.iostats[v]
417 }
418
419 // Close the RRVolumeManager
420 func (vm *RRVolumeManager) Close() {
421 }
422
423 // VolumeStatus describes the current condition of a volume
424 type VolumeStatus struct {
425         MountPoint string
426         DeviceNum  uint64
427         BytesFree  uint64
428         BytesUsed  uint64
429 }
430
431 // ioStats tracks I/O statistics for a volume or server
432 type ioStats struct {
433         Errors     uint64
434         Ops        uint64
435         CompareOps uint64
436         GetOps     uint64
437         PutOps     uint64
438         TouchOps   uint64
439         InBytes    uint64
440         OutBytes   uint64
441 }
442
443 type InternalStatser interface {
444         InternalStats() interface{}
445 }