1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 "git.arvados.org/arvados.git/sdk/go/arvados"
18 "github.com/sirupsen/logrus"
21 type BlockWriter interface {
22 // WriteBlock reads all data from r, writes it to a backing
23 // store as "loc", and returns the number of bytes written.
24 WriteBlock(ctx context.Context, loc string, r io.Reader) error
27 type BlockReader interface {
28 // ReadBlock retrieves data previously stored as "loc" and
30 ReadBlock(ctx context.Context, loc string, w io.Writer) error
33 var driver = map[string]func(*arvados.Cluster, arvados.Volume, logrus.FieldLogger, *volumeMetricsVecs) (Volume, error){}
35 // A Volume is an interface representing a Keep back-end storage unit:
36 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
38 type Volume interface {
39 // Get a block: copy the block data into buf, and return the
40 // number of bytes copied.
42 // loc is guaranteed to consist of 32 or more lowercase hex
45 // Get should not verify the integrity of the data: it should
46 // just return whatever was found in its backing
47 // store. (Integrity checking is the caller's responsibility.)
49 // If an error is encountered that prevents it from
50 // retrieving the data, that error should be returned so the
51 // caller can log (and send to the client) a more useful
54 // If the error is "not found", and there's no particular
55 // reason to expect the block to be found (other than that a
56 // caller is asking for it), the returned error should satisfy
57 // os.IsNotExist(err): this is a normal condition and will not
58 // be logged as an error (except that a 404 will appear in the
59 // access log if the block is not found on any other volumes
62 // If the data in the backing store is bigger than len(buf),
63 // then Get is permitted to return an error without reading
66 // len(buf) will not exceed BlockSize.
67 Get(ctx context.Context, loc string, buf []byte) (int, error)
69 // Compare the given data with the stored data (i.e., what Get
70 // would return). If equal, return nil. If not, return
71 // CollisionError or DiskHashError (depending on whether the
72 // data on disk matches the expected hash), or whatever error
73 // was encountered opening/reading the stored data.
74 Compare(ctx context.Context, loc string, data []byte) error
76 // Put writes a block to an underlying storage device.
78 // loc is as described in Get.
80 // len(block) is guaranteed to be between 0 and BlockSize.
82 // If a block is already stored under the same name (loc) with
83 // different content, Put must either overwrite the existing
84 // data with the new data or return a non-nil error. When
85 // overwriting existing data, it must never leave the storage
86 // device in an inconsistent state: a subsequent call to Get
87 // must return either the entire old block, the entire new
88 // block, or an error. (An implementation that cannot peform
89 // atomic updates must leave the old data alone and return an
92 // Put also sets the timestamp for the given locator to the
95 // Put must return a non-nil error unless it can guarantee
96 // that the entire block has been written and flushed to
97 // persistent storage, and that its timestamp is current. Of
98 // course, this guarantee is only as good as the underlying
99 // storage device, but it is Put's responsibility to at least
100 // get whatever guarantee is offered by the storage device.
102 // Put should not verify that loc==hash(block): this is the
103 // caller's responsibility.
104 Put(ctx context.Context, loc string, block []byte) error
106 // Touch sets the timestamp for the given locator to the
109 // loc is as described in Get.
111 // If invoked at time t0, Touch must guarantee that a
112 // subsequent call to Mtime will return a timestamp no older
113 // than {t0 minus one second}. For example, if Touch is called
114 // at 2015-07-07T01:23:45.67890123Z, it is acceptable for a
115 // subsequent Mtime to return any of the following:
117 // - 2015-07-07T01:23:45.00000000Z
118 // - 2015-07-07T01:23:45.67890123Z
119 // - 2015-07-07T01:23:46.67890123Z
120 // - 2015-07-08T00:00:00.00000000Z
122 // It is not acceptable for a subsequente Mtime to return
123 // either of the following:
125 // - 2015-07-07T00:00:00.00000000Z -- ERROR
126 // - 2015-07-07T01:23:44.00000000Z -- ERROR
128 // Touch must return a non-nil error if the timestamp cannot
130 Touch(loc string) error
132 // Mtime returns the stored timestamp for the given locator.
134 // loc is as described in Get.
136 // Mtime must return a non-nil error if the given block is not
137 // found or the timestamp could not be retrieved.
138 Mtime(loc string) (time.Time, error)
140 // IndexTo writes a complete list of locators with the given
141 // prefix for which Get() can retrieve data.
143 // prefix consists of zero or more lowercase hexadecimal
146 // Each locator must be written to the given writer using the
149 // loc "+" size " " timestamp "\n"
153 // - size is the number of bytes of content, given as a
154 // decimal number with one or more digits
156 // - timestamp is the timestamp stored for the locator,
157 // given as a decimal number of seconds after January 1,
160 // IndexTo must not write any other data to writer: for
161 // example, it must not write any blank lines.
163 // If an error makes it impossible to provide a complete
164 // index, IndexTo must return a non-nil error. It is
165 // acceptable to return a non-nil error after writing a
166 // partial index to writer.
168 // The resulting index is not expected to be sorted in any
170 IndexTo(prefix string, writer io.Writer) error
172 // Trash moves the block data from the underlying storage
173 // device to trash area. The block then stays in trash for
174 // BlobTrashLifetime before it is actually deleted.
176 // loc is as described in Get.
178 // If the timestamp for the given locator is newer than
179 // BlobSigningTTL, Trash must not trash the data.
181 // If a Trash operation overlaps with any Touch or Put
182 // operations on the same locator, the implementation must
183 // ensure one of the following outcomes:
185 // - Touch and Put return a non-nil error, or
186 // - Trash does not trash the block, or
187 // - Both of the above.
189 // If it is possible for the storage device to be accessed by
190 // a different process or host, the synchronization mechanism
191 // should also guard against races with other processes and
192 // hosts. If such a mechanism is not available, there must be
193 // a mechanism for detecting unsafe configurations, alerting
194 // the operator, and aborting or falling back to a read-only
195 // state. In other words, running multiple keepstore processes
196 // with the same underlying storage device must either work
197 // reliably or fail outright.
199 // Corollary: A successful Touch or Put guarantees a block
200 // will not be trashed for at least BlobSigningTTL seconds.
201 Trash(loc string) error
203 // Untrash moves block from trash back into store
204 Untrash(loc string) error
206 // Status returns a *VolumeStatus representing the current
207 // in-use and available storage capacity and an
208 // implementation-specific volume identifier (e.g., "mount
209 // point" for a UnixVolume).
210 Status() *VolumeStatus
212 // String returns an identifying label for this volume,
213 // suitable for including in log messages. It should contain
214 // enough information to uniquely identify the underlying
215 // storage device, but should not contain any credentials or
219 // EmptyTrash looks for trashed blocks that exceeded
220 // BlobTrashLifetime and deletes them from the volume.
223 // Return a globally unique ID of the underlying storage
224 // device if possible, otherwise "".
228 // A VolumeWithExamples provides example configs to display in the
230 type VolumeWithExamples interface {
235 // A VolumeManager tells callers which volumes can read, which volumes
236 // can write, and on which volume the next write should be attempted.
237 type VolumeManager interface {
238 // Mounts returns all mounts (volume attachments).
239 Mounts() []*VolumeMount
241 // Lookup returns the mount with the given UUID. Returns nil
242 // if the mount does not exist. If write==true, returns nil if
243 // the mount is not writable.
244 Lookup(uuid string, write bool) *VolumeMount
246 // AllReadable returns all mounts.
247 AllReadable() []*VolumeMount
249 // AllWritable returns all mounts that aren't known to be in
250 // a read-only state. (There is no guarantee that a write to
251 // one will succeed, though.)
252 AllWritable() []*VolumeMount
254 // NextWritable returns the volume where the next new block
255 // should be written. A VolumeManager can select a volume in
256 // order to distribute activity across spindles, fill up disks
257 // with more free space, etc.
258 NextWritable() *VolumeMount
260 // VolumeStats returns the ioStats used for tracking stats for
262 VolumeStats(Volume) *ioStats
264 // Close shuts down the volume manager cleanly.
268 // A VolumeMount is an attachment of a Volume to a VolumeManager.
269 type VolumeMount struct {
274 // Generate a UUID the way API server would for a "KeepVolumeMount"
276 func (*VolumeMount) generateUUID() string {
278 _, ok := max.SetString("zzzzzzzzzzzzzzz", 36)
280 panic("big.Int parse failed")
282 r, err := rand.Int(rand.Reader, &max)
286 return fmt.Sprintf("zzzzz-ivpuk-%015s", r.Text(36))
289 // RRVolumeManager is a round-robin VolumeManager: the Nth call to
290 // NextWritable returns the (N % len(writables))th writable Volume
291 // (where writables are all Volumes v where v.Writable()==true).
292 type RRVolumeManager struct {
293 mounts []*VolumeMount
294 mountMap map[string]*VolumeMount
295 readables []*VolumeMount
296 writables []*VolumeMount
298 iostats map[Volume]*ioStats
301 func makeRRVolumeManager(logger logrus.FieldLogger, cluster *arvados.Cluster, myURL arvados.URL, metrics *volumeMetricsVecs) (*RRVolumeManager, error) {
302 vm := &RRVolumeManager{
303 iostats: make(map[Volume]*ioStats),
305 vm.mountMap = make(map[string]*VolumeMount)
306 for uuid, cfgvol := range cluster.Volumes {
307 va, ok := cfgvol.AccessViaHosts[myURL]
308 if !ok && len(cfgvol.AccessViaHosts) > 0 {
311 dri, ok := driver[cfgvol.Driver]
313 return nil, fmt.Errorf("volume %s: invalid driver %q", uuid, cfgvol.Driver)
315 vol, err := dri(cluster, cfgvol, logger, metrics)
317 return nil, fmt.Errorf("error initializing volume %s: %s", uuid, err)
319 sc := cfgvol.StorageClasses
321 sc = map[string]bool{"default": true}
323 repl := cfgvol.Replication
328 KeepMount: arvados.KeepMount{
330 DeviceID: vol.GetDeviceID(),
331 AllowWrite: !va.ReadOnly && !cfgvol.ReadOnly,
332 AllowTrash: !va.ReadOnly && (!cfgvol.ReadOnly || cfgvol.AllowTrashWhenReadOnly),
338 vm.iostats[vol] = &ioStats{}
339 vm.mounts = append(vm.mounts, mnt)
340 vm.mountMap[uuid] = mnt
341 vm.readables = append(vm.readables, mnt)
342 if mnt.KeepMount.AllowWrite {
343 vm.writables = append(vm.writables, mnt)
345 logger.Printf("started volume %s (%s), AllowWrite=%v, AllowTrash=%v", uuid, vol, mnt.AllowWrite, mnt.AllowTrash)
347 // pri(mnt): return highest priority of any storage class
349 pri := func(mnt *VolumeMount) int {
350 any, best := false, 0
351 for class := range mnt.KeepMount.StorageClasses {
352 if p := cluster.StorageClasses[class].Priority; !any || best < p {
359 // less(a,b): sort first by highest priority of any offered
360 // storage class (highest->lowest), then by volume UUID
361 less := func(a, b *VolumeMount) bool {
362 if pa, pb := pri(a), pri(b); pa != pb {
365 return a.KeepMount.UUID < b.KeepMount.UUID
368 sort.Slice(vm.readables, func(i, j int) bool {
369 return less(vm.readables[i], vm.readables[j])
371 sort.Slice(vm.writables, func(i, j int) bool {
372 return less(vm.writables[i], vm.writables[j])
374 sort.Slice(vm.mounts, func(i, j int) bool {
375 return less(vm.mounts[i], vm.mounts[j])
380 func (vm *RRVolumeManager) Mounts() []*VolumeMount {
384 func (vm *RRVolumeManager) Lookup(uuid string, needWrite bool) *VolumeMount {
385 if mnt, ok := vm.mountMap[uuid]; ok && (!needWrite || mnt.AllowWrite) {
391 // AllReadable returns an array of all readable volumes
392 func (vm *RRVolumeManager) AllReadable() []*VolumeMount {
396 // AllWritable returns writable volumes, sorted by priority/uuid. Used
397 // by CompareAndTouch to ensure higher-priority volumes are checked
399 func (vm *RRVolumeManager) AllWritable() []*VolumeMount {
403 // NextWritable returns writable volumes, rotated by vm.counter so
404 // each volume gets a turn to be first. Used by PutBlock to distribute
405 // new data across available volumes.
406 func (vm *RRVolumeManager) NextWritable() []*VolumeMount {
407 if len(vm.writables) == 0 {
410 offset := (int(atomic.AddUint32(&vm.counter, 1)) - 1) % len(vm.writables)
411 return append(append([]*VolumeMount(nil), vm.writables[offset:]...), vm.writables[:offset]...)
414 // VolumeStats returns an ioStats for the given volume.
415 func (vm *RRVolumeManager) VolumeStats(v Volume) *ioStats {
419 // Close the RRVolumeManager
420 func (vm *RRVolumeManager) Close() {
423 // VolumeStatus describes the current condition of a volume
424 type VolumeStatus struct {
431 // ioStats tracks I/O statistics for a volume or server
432 type ioStats struct {
443 type InternalStatser interface {
444 InternalStats() interface{}