3414: fix log typo
[arvados.git] / services / keepstore / volume.go
1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
3 // etc.
4
5 package main
6
7 import (
8         "errors"
9         "fmt"
10         "os"
11         "strings"
12         "time"
13 )
14
15 type Volume interface {
16         Get(loc string) ([]byte, error)
17         Put(loc string, block []byte) error
18         Touch(loc string) error
19         Mtime(loc string) (time.Time, error)
20         Index(prefix string) string
21         Delete(loc string) error
22         Status() *VolumeStatus
23         String() string
24 }
25
26 // MockVolumes are Volumes used to test the Keep front end.
27 //
28 // If the Bad field is true, this volume should return an error
29 // on all writes and puts.
30 //
31 // The Touchable field signifies whether the Touch method will
32 // succeed.  Defaults to true.  Note that Bad and Touchable are
33 // independent: a MockVolume may be set up so that Put fails but Touch
34 // works or vice versa.
35 //
36 // TODO(twp): rename Bad to something more descriptive, e.g. Writable,
37 // and make sure that the tests that rely on it are testing the right
38 // thing.  We may need to simulate Writable, Touchable and Corrupt
39 // volumes in different ways.
40 //
41 type MockVolume struct {
42         Store      map[string][]byte
43         Timestamps map[string]time.Time
44         Bad        bool
45         Touchable  bool
46 }
47
48 func CreateMockVolume() *MockVolume {
49         return &MockVolume{
50                 Store:      make(map[string][]byte),
51                 Timestamps: make(map[string]time.Time),
52                 Bad:        false,
53                 Touchable:  true,
54         }
55 }
56
57 func (v *MockVolume) Get(loc string) ([]byte, error) {
58         if v.Bad {
59                 return nil, errors.New("Bad volume")
60         } else if block, ok := v.Store[loc]; ok {
61                 return block, nil
62         }
63         return nil, os.ErrNotExist
64 }
65
66 func (v *MockVolume) Put(loc string, block []byte) error {
67         if v.Bad {
68                 return errors.New("Bad volume")
69         }
70         v.Store[loc] = block
71         return v.Touch(loc)
72 }
73
74 func (v *MockVolume) Touch(loc string) error {
75         if v.Touchable {
76                 v.Timestamps[loc] = time.Now()
77                 return nil
78         }
79         return errors.New("Touch failed")
80 }
81
82 func (v *MockVolume) Mtime(loc string) (time.Time, error) {
83         var mtime time.Time
84         var err error
85         if v.Bad {
86                 err = errors.New("Bad volume")
87         } else if t, ok := v.Timestamps[loc]; ok {
88                 mtime = t
89         } else {
90                 err = os.ErrNotExist
91         }
92         return mtime, err
93 }
94
95 func (v *MockVolume) Index(prefix string) string {
96         var result string
97         for loc, block := range v.Store {
98                 if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
99                         result = result + fmt.Sprintf("%s+%d %d\n",
100                                 loc, len(block), 123456789)
101                 }
102         }
103         return result
104 }
105
106 func (v *MockVolume) Delete(loc string) error {
107         if _, ok := v.Store[loc]; ok {
108                 if time.Since(v.Timestamps[loc]) < permission_ttl {
109                         return nil
110                 }
111                 delete(v.Store, loc)
112                 return nil
113         }
114         return os.ErrNotExist
115 }
116
117 func (v *MockVolume) Status() *VolumeStatus {
118         var used uint64
119         for _, block := range v.Store {
120                 used = used + uint64(len(block))
121         }
122         return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
123 }
124
125 func (v *MockVolume) String() string {
126         return "[MockVolume]"
127 }
128
129 // A VolumeManager manages a collection of volumes.
130 //
131 // - Volumes is a slice of available Volumes.
132 // - Choose() returns a Volume suitable for writing to.
133 // - Quit() instructs the VolumeManager to shut down gracefully.
134 //
135 type VolumeManager interface {
136         Volumes() []Volume
137         Choose() Volume
138         Quit()
139 }
140
141 type RRVolumeManager struct {
142         volumes   []Volume
143         nextwrite chan Volume
144         quit      chan int
145 }
146
147 func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
148         // Create a new VolumeManager struct with the specified volumes,
149         // and with new Nextwrite and Quit channels.
150         // The Quit channel is buffered with a capacity of 1 so that
151         // another routine may write to it without blocking.
152         vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
153
154         // This goroutine implements round-robin volume selection.
155         // It sends each available Volume in turn to the Nextwrite
156         // channel, until receiving a notification on the Quit channel
157         // that it should terminate.
158         go func() {
159                 var i int = 0
160                 for {
161                         select {
162                         case <-vm.quit:
163                                 return
164                         case vm.nextwrite <- vm.volumes[i]:
165                                 i = (i + 1) % len(vm.volumes)
166                         }
167                 }
168         }()
169
170         return vm
171 }
172
173 func (vm *RRVolumeManager) Volumes() []Volume {
174         return vm.volumes
175 }
176
177 func (vm *RRVolumeManager) Choose() Volume {
178         return <-vm.nextwrite
179 }
180
181 func (vm *RRVolumeManager) Quit() {
182         vm.quit <- 1
183 }