3448: unit tests, bug fixes
[arvados.git] / services / keepstore / volume.go
1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
3 // etc.
4
5 package main
6
7 import (
8         "errors"
9         "fmt"
10         "os"
11         "strings"
12         "time"
13 )
14
15 type Volume interface {
16         Get(loc string) ([]byte, error)
17         Put(loc string, block []byte) error
18         Touch(loc string) error
19         Mtime(loc string) (time.Time, error)
20         Index(prefix string) string
21         Delete(loc string) error
22         Status() *VolumeStatus
23         String() string
24 }
25
26 // MockVolumes are Volumes used to test the Keep front end.
27 //
28 // If the Bad field is true, this volume should return an error
29 // on all writes and puts.
30 //
31 type MockVolume struct {
32         Store      map[string][]byte
33         Timestamps map[string]time.Time
34         Bad        bool
35 }
36
37 func CreateMockVolume() *MockVolume {
38         return &MockVolume{
39                 make(map[string][]byte),
40                 make(map[string]time.Time),
41                 false,
42         }
43 }
44
45 func (v *MockVolume) Get(loc string) ([]byte, error) {
46         if v.Bad {
47                 return nil, errors.New("Bad volume")
48         } else if block, ok := v.Store[loc]; ok {
49                 return block, nil
50         }
51         return nil, os.ErrNotExist
52 }
53
54 func (v *MockVolume) Put(loc string, block []byte) error {
55         if v.Bad {
56                 return errors.New("Bad volume")
57         }
58         v.Store[loc] = block
59         return v.Touch(loc)
60 }
61
62 func (v *MockVolume) Touch(loc string) error {
63         if v.Bad {
64                 return errors.New("Bad volume")
65         }
66         v.Timestamps[loc] = time.Now()
67         return nil
68 }
69
70 func (v *MockVolume) Mtime(loc string) (time.Time, error) {
71         var mtime time.Time
72         var err error
73         if v.Bad {
74                 err = errors.New("Bad volume")
75         } else if t, ok := v.Timestamps[loc]; ok {
76                 mtime = t
77         } else {
78                 err = os.ErrNotExist
79         }
80         return mtime, err
81 }
82
83 func (v *MockVolume) Index(prefix string) string {
84         var result string
85         for loc, block := range v.Store {
86                 if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
87                         result = result + fmt.Sprintf("%s+%d %d\n",
88                                 loc, len(block), 123456789)
89                 }
90         }
91         return result
92 }
93
94 func (v *MockVolume) Delete(loc string) error {
95         if _, ok := v.Store[loc]; ok {
96                 if time.Since(v.Timestamps[loc]) < permission_ttl {
97                         return nil
98                 }
99                 delete(v.Store, loc)
100                 return nil
101         }
102         return os.ErrNotExist
103 }
104
105 func (v *MockVolume) Status() *VolumeStatus {
106         var used uint64
107         for _, block := range v.Store {
108                 used = used + uint64(len(block))
109         }
110         return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
111 }
112
113 func (v *MockVolume) String() string {
114         return "[MockVolume]"
115 }
116
117 // A VolumeManager manages a collection of volumes.
118 //
119 // - Volumes is a slice of available Volumes.
120 // - Choose() returns a Volume suitable for writing to.
121 // - Quit() instructs the VolumeManager to shut down gracefully.
122 //
123 type VolumeManager interface {
124         Volumes() []Volume
125         Choose() Volume
126         Quit()
127 }
128
129 type RRVolumeManager struct {
130         volumes   []Volume
131         nextwrite chan Volume
132         quit      chan int
133 }
134
135 func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
136         // Create a new VolumeManager struct with the specified volumes,
137         // and with new Nextwrite and Quit channels.
138         // The Quit channel is buffered with a capacity of 1 so that
139         // another routine may write to it without blocking.
140         vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
141
142         // This goroutine implements round-robin volume selection.
143         // It sends each available Volume in turn to the Nextwrite
144         // channel, until receiving a notification on the Quit channel
145         // that it should terminate.
146         go func() {
147                 var i int = 0
148                 for {
149                         select {
150                         case <-vm.quit:
151                                 return
152                         case vm.nextwrite <- vm.volumes[i]:
153                                 i = (i + 1) % len(vm.volumes)
154                         }
155                 }
156         }()
157
158         return vm
159 }
160
161 func (vm *RRVolumeManager) Volumes() []Volume {
162         return vm.volumes
163 }
164
165 func (vm *RRVolumeManager) Choose() Volume {
166         return <-vm.nextwrite
167 }
168
169 func (vm *RRVolumeManager) Quit() {
170         vm.quit <- 1
171 }