Merge branch 'master' into 3112-report-bug
[arvados.git] / services / keepstore / volume.go
1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
3 // etc.
4
5 package main
6
7 import (
8         "errors"
9         "fmt"
10         "os"
11         "strings"
12 )
13
14 type Volume interface {
15         Get(loc string) ([]byte, error)
16         Put(loc string, block []byte) error
17         Index(prefix string) string
18         Delete(loc string) error
19         Status() *VolumeStatus
20         String() string
21 }
22
23 // MockVolumes are Volumes used to test the Keep front end.
24 //
25 // If the Bad field is true, this volume should return an error
26 // on all writes and puts.
27 //
28 type MockVolume struct {
29         Store map[string][]byte
30         Bad   bool
31 }
32
33 func CreateMockVolume() *MockVolume {
34         return &MockVolume{make(map[string][]byte), false}
35 }
36
37 func (v *MockVolume) Get(loc string) ([]byte, error) {
38         if v.Bad {
39                 return nil, errors.New("Bad volume")
40         } else if block, ok := v.Store[loc]; ok {
41                 return block, nil
42         }
43         return nil, os.ErrNotExist
44 }
45
46 func (v *MockVolume) Put(loc string, block []byte) error {
47         if v.Bad {
48                 return errors.New("Bad volume")
49         }
50         v.Store[loc] = block
51         return nil
52 }
53
54 func (v *MockVolume) Index(prefix string) string {
55         var result string
56         for loc, block := range v.Store {
57                 if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
58                         result = result + fmt.Sprintf("%s+%d %d\n",
59                                 loc, len(block), 123456789)
60                 }
61         }
62         return result
63 }
64
65 func (v *MockVolume) Delete(loc string) error {
66         if _, ok := v.Store[loc]; ok {
67                 delete(v.Store, loc)
68                 return nil
69         }
70         return os.ErrNotExist
71 }
72
73 func (v *MockVolume) Status() *VolumeStatus {
74         var used uint64
75         for _, block := range v.Store {
76                 used = used + uint64(len(block))
77         }
78         return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
79 }
80
81 func (v *MockVolume) String() string {
82         return "[MockVolume]"
83 }
84
85 // A VolumeManager manages a collection of volumes.
86 //
87 // - Volumes is a slice of available Volumes.
88 // - Choose() returns a Volume suitable for writing to.
89 // - Quit() instructs the VolumeManager to shut down gracefully.
90 //
91 type VolumeManager interface {
92         Volumes() []Volume
93         Choose() Volume
94         Quit()
95 }
96
97 type RRVolumeManager struct {
98         volumes   []Volume
99         nextwrite chan Volume
100         quit      chan int
101 }
102
103 func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
104         // Create a new VolumeManager struct with the specified volumes,
105         // and with new Nextwrite and Quit channels.
106         // The Quit channel is buffered with a capacity of 1 so that
107         // another routine may write to it without blocking.
108         vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
109
110         // This goroutine implements round-robin volume selection.
111         // It sends each available Volume in turn to the Nextwrite
112         // channel, until receiving a notification on the Quit channel
113         // that it should terminate.
114         go func() {
115                 var i int = 0
116                 for {
117                         select {
118                         case <-vm.quit:
119                                 return
120                         case vm.nextwrite <- vm.volumes[i]:
121                                 i = (i + 1) % len(vm.volumes)
122                         }
123                 }
124         }()
125
126         return vm
127 }
128
129 func (vm *RRVolumeManager) Volumes() []Volume {
130         return vm.volumes
131 }
132
133 func (vm *RRVolumeManager) Choose() Volume {
134         return <-vm.nextwrite
135 }
136
137 func (vm *RRVolumeManager) Quit() {
138         vm.quit <- 1
139 }