3448: minor bugfixes
[arvados.git] / services / keepstore / volume.go
1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
3 // etc.
4
5 package main
6
7 import (
8         "errors"
9         "fmt"
10         "os"
11         "strings"
12         "time"
13 )
14
15 type Volume interface {
16         Get(loc string) ([]byte, error)
17         Put(loc string, block []byte) error
18         Touch(loc string) error
19         Index(prefix string) string
20         Delete(loc string) error
21         Status() *VolumeStatus
22         String() string
23 }
24
25 // MockVolumes are Volumes used to test the Keep front end.
26 //
27 // If the Bad field is true, this volume should return an error
28 // on all writes and puts.
29 //
30 type MockVolume struct {
31         Store      map[string][]byte
32         Timestamps map[string]time.Time
33         Bad        bool
34 }
35
36 func CreateMockVolume() *MockVolume {
37         return &MockVolume{
38                 make(map[string][]byte),
39                 make(map[string]time.Time),
40                 false,
41         }
42 }
43
44 func (v *MockVolume) Get(loc string) ([]byte, error) {
45         if v.Bad {
46                 return nil, errors.New("Bad volume")
47         } else if block, ok := v.Store[loc]; ok {
48                 return block, nil
49         }
50         return nil, os.ErrNotExist
51 }
52
53 func (v *MockVolume) Put(loc string, block []byte) error {
54         if v.Bad {
55                 return errors.New("Bad volume")
56         }
57         v.Store[loc] = block
58         return v.Touch(loc)
59 }
60
61 func (v *MockVolume) Touch(loc string) error {
62         if v.Bad {
63                 return errors.New("Bad volume")
64         }
65         v.Timestamps[loc] = time.Now()
66         return nil
67 }
68
69 func (v *MockVolume) Index(prefix string) string {
70         var result string
71         for loc, block := range v.Store {
72                 if IsValidLocator(loc) && strings.HasPrefix(loc, prefix) {
73                         result = result + fmt.Sprintf("%s+%d %d\n",
74                                 loc, len(block), 123456789)
75                 }
76         }
77         return result
78 }
79
80 func (v *MockVolume) Delete(loc string) error {
81         if _, ok := v.Store[loc]; ok {
82                 delete(v.Store, loc)
83                 return nil
84         }
85         return os.ErrNotExist
86 }
87
88 func (v *MockVolume) Status() *VolumeStatus {
89         var used uint64
90         for _, block := range v.Store {
91                 used = used + uint64(len(block))
92         }
93         return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
94 }
95
96 func (v *MockVolume) String() string {
97         return "[MockVolume]"
98 }
99
100 // A VolumeManager manages a collection of volumes.
101 //
102 // - Volumes is a slice of available Volumes.
103 // - Choose() returns a Volume suitable for writing to.
104 // - Quit() instructs the VolumeManager to shut down gracefully.
105 //
106 type VolumeManager interface {
107         Volumes() []Volume
108         Choose() Volume
109         Quit()
110 }
111
112 type RRVolumeManager struct {
113         volumes   []Volume
114         nextwrite chan Volume
115         quit      chan int
116 }
117
118 func MakeRRVolumeManager(vols []Volume) *RRVolumeManager {
119         // Create a new VolumeManager struct with the specified volumes,
120         // and with new Nextwrite and Quit channels.
121         // The Quit channel is buffered with a capacity of 1 so that
122         // another routine may write to it without blocking.
123         vm := &RRVolumeManager{vols, make(chan Volume), make(chan int, 1)}
124
125         // This goroutine implements round-robin volume selection.
126         // It sends each available Volume in turn to the Nextwrite
127         // channel, until receiving a notification on the Quit channel
128         // that it should terminate.
129         go func() {
130                 var i int = 0
131                 for {
132                         select {
133                         case <-vm.quit:
134                                 return
135                         case vm.nextwrite <- vm.volumes[i]:
136                                 i = (i + 1) % len(vm.volumes)
137                         }
138                 }
139         }()
140
141         return vm
142 }
143
144 func (vm *RRVolumeManager) Volumes() []Volume {
145         return vm.volumes
146 }
147
148 func (vm *RRVolumeManager) Choose() Volume {
149         return <-vm.nextwrite
150 }
151
152 func (vm *RRVolumeManager) Quit() {
153         vm.quit <- 1
154 }