16736: Adds tests to confirm expires_at gets properly set on runtime tokens.
[arvados.git] / services / keepstore / volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "context"
9         "crypto/rand"
10         "fmt"
11         "io"
12         "math/big"
13         "sync/atomic"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "github.com/sirupsen/logrus"
18 )
19
20 type BlockWriter interface {
21         // WriteBlock reads all data from r, writes it to a backing
22         // store as "loc", and returns the number of bytes written.
23         WriteBlock(ctx context.Context, loc string, r io.Reader) error
24 }
25
26 type BlockReader interface {
27         // ReadBlock retrieves data previously stored as "loc" and
28         // writes it to w.
29         ReadBlock(ctx context.Context, loc string, w io.Writer) error
30 }
31
32 var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
33
34 // A Volume is an interface representing a Keep back-end storage unit:
35 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
36 // etc.
37 type Volume interface {
38         // Get a block: copy the block data into buf, and return the
39         // number of bytes copied.
40         //
41         // loc is guaranteed to consist of 32 or more lowercase hex
42         // digits.
43         //
44         // Get should not verify the integrity of the data: it should
45         // just return whatever was found in its backing
46         // store. (Integrity checking is the caller's responsibility.)
47         //
48         // If an error is encountered that prevents it from
49         // retrieving the data, that error should be returned so the
50         // caller can log (and send to the client) a more useful
51         // message.
52         //
53         // If the error is "not found", and there's no particular
54         // reason to expect the block to be found (other than that a
55         // caller is asking for it), the returned error should satisfy
56         // os.IsNotExist(err): this is a normal condition and will not
57         // be logged as an error (except that a 404 will appear in the
58         // access log if the block is not found on any other volumes
59         // either).
60         //
61         // If the data in the backing store is bigger than len(buf),
62         // then Get is permitted to return an error without reading
63         // any of the data.
64         //
65         // len(buf) will not exceed BlockSize.
66         Get(ctx context.Context, loc string, buf []byte) (int, error)
67
68         // Compare the given data with the stored data (i.e., what Get
69         // would return). If equal, return nil. If not, return
70         // CollisionError or DiskHashError (depending on whether the
71         // data on disk matches the expected hash), or whatever error
72         // was encountered opening/reading the stored data.
73         Compare(ctx context.Context, loc string, data []byte) error
74
75         // Put writes a block to an underlying storage device.
76         //
77         // loc is as described in Get.
78         //
79         // len(block) is guaranteed to be between 0 and BlockSize.
80         //
81         // If a block is already stored under the same name (loc) with
82         // different content, Put must either overwrite the existing
83         // data with the new data or return a non-nil error. When
84         // overwriting existing data, it must never leave the storage
85         // device in an inconsistent state: a subsequent call to Get
86         // must return either the entire old block, the entire new
87         // block, or an error. (An implementation that cannot peform
88         // atomic updates must leave the old data alone and return an
89         // error.)
90         //
91         // Put also sets the timestamp for the given locator to the
92         // current time.
93         //
94         // Put must return a non-nil error unless it can guarantee
95         // that the entire block has been written and flushed to
96         // persistent storage, and that its timestamp is current. Of
97         // course, this guarantee is only as good as the underlying
98         // storage device, but it is Put's responsibility to at least
99         // get whatever guarantee is offered by the storage device.
100         //
101         // Put should not verify that loc==hash(block): this is the
102         // caller's responsibility.
103         Put(ctx context.Context, loc string, block []byte) error
104
105         // Touch sets the timestamp for the given locator to the
106         // current time.
107         //
108         // loc is as described in Get.
109         //
110         // If invoked at time t0, Touch must guarantee that a
111         // subsequent call to Mtime will return a timestamp no older
112         // than {t0 minus one second}. For example, if Touch is called
113         // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
114         // subsequent Mtime to return any of the following:
115         //
116         //   - 2015-07-07T01:23:45.00000000Z
117         //   - 2015-07-07T01:23:45.67890123Z
118         //   - 2015-07-07T01:23:46.67890123Z
119         //   - 2015-07-08T00:00:00.00000000Z
120         //
121         // It is not acceptable for a subsequente Mtime to return
122         // either of the following:
123         //
124         //   - 2015-07-07T00:00:00.00000000Z -- ERROR
125         //   - 2015-07-07T01:23:44.00000000Z -- ERROR
126         //
127         // Touch must return a non-nil error if the timestamp cannot
128         // be updated.
129         Touch(loc string) error
130
131         // Mtime returns the stored timestamp for the given locator.
132         //
133         // loc is as described in Get.
134         //
135         // Mtime must return a non-nil error if the given block is not
136         // found or the timestamp could not be retrieved.
137         Mtime(loc string) (time.Time, error)
138
139         // IndexTo writes a complete list of locators with the given
140         // prefix for which Get() can retrieve data.
141         //
142         // prefix consists of zero or more lowercase hexadecimal
143         // digits.
144         //
145         // Each locator must be written to the given writer using the
146         // following format:
147         //
148         //   loc "+" size " " timestamp "\n"
149         //
150         // where:
151         //
152         //   - size is the number of bytes of content, given as a
153         //     decimal number with one or more digits
154         //
155         //   - timestamp is the timestamp stored for the locator,
156         //     given as a decimal number of seconds after January 1,
157         //     1970 UTC.
158         //
159         // IndexTo must not write any other data to writer: for
160         // example, it must not write any blank lines.
161         //
162         // If an error makes it impossible to provide a complete
163         // index, IndexTo must return a non-nil error. It is
164         // acceptable to return a non-nil error after writing a
165         // partial index to writer.
166         //
167         // The resulting index is not expected to be sorted in any
168         // particular order.
169         IndexTo(prefix string, writer io.Writer) error
170
171         // Trash moves the block data from the underlying storage
172         // device to trash area. The block then stays in trash for
173         // BlobTrashLifetime before it is actually deleted.
174         //
175         // loc is as described in Get.
176         //
177         // If the timestamp for the given locator is newer than
178         // BlobSigningTTL, Trash must not trash the data.
179         //
180         // If a Trash operation overlaps with any Touch or Put
181         // operations on the same locator, the implementation must
182         // ensure one of the following outcomes:
183         //
184         //   - Touch and Put return a non-nil error, or
185         //   - Trash does not trash the block, or
186         //   - Both of the above.
187         //
188         // If it is possible for the storage device to be accessed by
189         // a different process or host, the synchronization mechanism
190         // should also guard against races with other processes and
191         // hosts. If such a mechanism is not available, there must be
192         // a mechanism for detecting unsafe configurations, alerting
193         // the operator, and aborting or falling back to a read-only
194         // state. In other words, running multiple keepstore processes
195         // with the same underlying storage device must either work
196         // reliably or fail outright.
197         //
198         // Corollary: A successful Touch or Put guarantees a block
199         // will not be trashed for at least BlobSigningTTL seconds.
200         Trash(loc string) error
201
202         // Untrash moves block from trash back into store
203         Untrash(loc string) error
204
205         // Status returns a *VolumeStatus representing the current
206         // in-use and available storage capacity and an
207         // implementation-specific volume identifier (e.g., "mount
208         // point" for a UnixVolume).
209         Status() *VolumeStatus
210
211         // String returns an identifying label for this volume,
212         // suitable for including in log messages. It should contain
213         // enough information to uniquely identify the underlying
214         // storage device, but should not contain any credentials or
215         // secrets.
216         String() string
217
218         // EmptyTrash looks for trashed blocks that exceeded
219         // BlobTrashLifetime and deletes them from the volume.
220         EmptyTrash()
221
222         // Return a globally unique ID of the underlying storage
223         // device if possible, otherwise "".
224         GetDeviceID() string
225 }
226
227 // A VolumeWithExamples provides example configs to display in the
228 // -help message.
229 type VolumeWithExamples interface {
230         Volume
231         Examples() []Volume
232 }
233
234 // A VolumeManager tells callers which volumes can read, which volumes
235 // can write, and on which volume the next write should be attempted.
236 type VolumeManager interface {
237         // Mounts returns all mounts (volume attachments).
238         Mounts() []*VolumeMount
239
240         // Lookup returns the mount with the given UUID. Returns nil
241         // if the mount does not exist. If write==true, returns nil if
242         // the mount is not writable.
243         Lookup(uuid string, write bool) *VolumeMount
244
245         // AllReadable returns all mounts.
246         AllReadable() []*VolumeMount
247
248         // AllWritable returns all mounts that aren't known to be in
249         // a read-only state. (There is no guarantee that a write to
250         // one will succeed, though.)
251         AllWritable() []*VolumeMount
252
253         // NextWritable returns the volume where the next new block
254         // should be written. A VolumeManager can select a volume in
255         // order to distribute activity across spindles, fill up disks
256         // with more free space, etc.
257         NextWritable() *VolumeMount
258
259         // VolumeStats returns the ioStats used for tracking stats for
260         // the given Volume.
261         VolumeStats(Volume) *ioStats
262
263         // Close shuts down the volume manager cleanly.
264         Close()
265 }
266
267 // A VolumeMount is an attachment of a Volume to a VolumeManager.
268 type VolumeMount struct {
269         arvados.KeepMount
270         Volume
271 }
272
273 // Generate a UUID the way API server would for a "KeepVolumeMount"
274 // object.
275 func (*VolumeMount) generateUUID() string {
276         var max big.Int
277         _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
278         if !ok {
279                 panic("big.Int parse failed")
280         }
281         r, err := rand.Int(rand.Reader, &max)
282         if err != nil {
283                 panic(err)
284         }
285         return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
286 }
287
288 // RRVolumeManager is a round-robin VolumeManager: the Nth call to
289 // NextWritable returns the (N % len(writables))th writable Volume
290 // (where writables are all Volumes v where v.Writable()==true).
291 type RRVolumeManager struct {
292         mounts    []*VolumeMount
293         mountMap  map[string]*VolumeMount
294         readables []*VolumeMount
295         writables []*VolumeMount
296         counter   uint32
297         iostats   map[Volume]*ioStats
298 }
299
300 func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
301         vm := &RRVolumeManager{
302                 iostats: make(map[Volume]*ioStats),
303         }
304         vm.mountMap = make(map[string]*VolumeMount)
305         for uuid, cfgvol := range cluster.Volumes {
306                 va, ok := cfgvol.AccessViaHosts[myURL]
307                 if !ok && len(cfgvol.AccessViaHosts) > 0 {
308                         continue
309                 }
310                 dri, ok := driver[cfgvol.Driver]
311                 if !ok {
312                         return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
313                 }
314                 vol, err := dri(cluster, cfgvol, logger, metrics)
315                 if err != nil {
316                         return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
317                 }
318                 logger.Printf("started volume %s (%s), ReadOnly=%v", uuid, vol, cfgvol.ReadOnly || va.ReadOnly)
319
320                 sc := cfgvol.StorageClasses
321                 if len(sc) == 0 {
322                         sc = map[string]bool{"default": true}
323                 }
324                 repl := cfgvol.Replication
325                 if repl < 1 {
326                         repl = 1
327                 }
328                 mnt := &VolumeMount{
329                         KeepMount: arvados.KeepMount{
330                                 UUID:           uuid,
331                                 DeviceID:       vol.GetDeviceID(),
332                                 ReadOnly:       cfgvol.ReadOnly || va.ReadOnly,
333                                 Replication:    repl,
334                                 StorageClasses: sc,
335                         },
336                         Volume: vol,
337                 }
338                 vm.iostats[vol] = &ioStats{}
339                 vm.mounts = append(vm.mounts, mnt)
340                 vm.mountMap[uuid] = mnt
341                 vm.readables = append(vm.readables, mnt)
342                 if !mnt.KeepMount.ReadOnly {
343                         vm.writables = append(vm.writables, mnt)
344                 }
345         }
346         return vm, nil
347 }
348
349 func (vm *RRVolumeManager) Mounts() []*VolumeMount {
350         return vm.mounts
351 }
352
353 func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
354         if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || !mnt.ReadOnly) {
355                 return mnt
356         }
357         return nil
358 }
359
360 // AllReadable returns an array of all readable volumes
361 func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
362         return vm.readables
363 }
364
365 // AllWritable returns an array of all writable volumes
366 func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
367         return vm.writables
368 }
369
370 // NextWritable returns the next writable
371 func (vm *RRVolumeManager) NextWritable() *VolumeMount {
372         if len(vm.writables) == 0 {
373                 return nil
374         }
375         i := atomic.AddUint32(&vm.counter, 1)
376         return vm.writables[i%uint32(len(vm.writables))]
377 }
378
379 // VolumeStats returns an ioStats for the given volume.
380 func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
381         return vm.iostats[v]
382 }
383
384 // Close the RRVolumeManager
385 func (vm *RRVolumeManager) Close() {
386 }
387
388 // VolumeStatus describes the current condition of a volume
389 type VolumeStatus struct {
390         MountPoint string
391         DeviceNum  uint64
392         BytesFree  uint64
393         BytesUsed  uint64
394 }
395
396 // ioStats tracks I/O statistics for a volume or server
397 type ioStats struct {
398         Errors     uint64
399         Ops        uint64
400         CompareOps uint64
401         GetOps     uint64
402         PutOps     uint64
403         TouchOps   uint64
404         InBytes    uint64
405         OutBytes   uint64
406 }
407
408 type InternalStatser interface {
409         InternalStats() interface{}
410 }