Cleaned up unit tests. (refs #2620)
[arvados.git] / services / keep / src / keep / volume.go
1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
3 // etc.
4 //
5 // A UnixVolume is a Volume that is backed by a locally mounted disk.
6
7 package main
8
9 import (
10         "fmt"
11         "io/ioutil"
12         "log"
13         "os"
14         "path/filepath"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 type Volume interface {
22         Get(loc string) ([]byte, error)
23         Put(loc string, block []byte) error
24         Index(prefix string) string
25         Status() *VolumeStatus
26         String() string
27 }
28
29 // IORequests are encapsulated requests to perform I/O on a Keep volume.
30 // When running in serialized mode, the Keep front end sends IORequests
31 // on a channel to an IORunner, which handles them one at a time and
32 // returns an IOResponse.
33 //
34 type IOMethod int
35
36 const (
37         KeepGet IOMethod = iota
38         KeepPut
39 )
40
41 type IORequest struct {
42         method IOMethod
43         loc    string
44         data   []byte
45         reply  chan *IOResponse
46 }
47
48 type IOResponse struct {
49         data []byte
50         err  error
51 }
52
53 // A UnixVolume is configured with:
54 //
55 // * root: the path to the volume's root directory
56 // * queue: if non-nil, all I/O requests for this volume should be queued
57 //   on this channel. The response will be delivered on the IOResponse
58 //   channel included in the request.
59 //
60 type UnixVolume struct {
61         root  string // path to this volume
62         queue chan *IORequest
63 }
64
65 func (v *UnixVolume) IOHandler() {
66         for req := range v.queue {
67                 var result IOResponse
68                 switch req.method {
69                 case KeepGet:
70                         result.data, result.err = v.Read(req.loc)
71                 case KeepPut:
72                         result.err = v.Write(req.loc, req.data)
73                 }
74                 req.reply <- &result
75         }
76 }
77
78 func MakeUnixVolume(root string, queue chan *IORequest) UnixVolume {
79         v := UnixVolume{root, queue}
80         if queue != nil {
81                 go v.IOHandler()
82         }
83         return v
84 }
85
86 func (v *UnixVolume) Get(loc string) ([]byte, error) {
87         if v.queue == nil {
88                 return v.Read(loc)
89         }
90         reply := make(chan *IOResponse)
91         v.queue <- &IORequest{KeepGet, loc, nil, reply}
92         response := <-reply
93         return response.data, response.err
94 }
95
96 func (v *UnixVolume) Put(loc string, block []byte) error {
97         if v.queue == nil {
98                 return v.Write(loc, block)
99         }
100         reply := make(chan *IOResponse)
101         v.queue <- &IORequest{KeepPut, loc, block, reply}
102         response := <-reply
103         return response.err
104 }
105
106 // Read retrieves a block identified by the locator string "loc", and
107 // returns its contents as a byte slice.
108 //
109 // If the block could not be opened or read, Read returns a nil slice
110 // and the os.Error that was generated.
111 //
112 // If the block is present but its content hash does not match loc,
113 // Read returns the block and a CorruptError.  It is the caller's
114 // responsibility to decide what (if anything) to do with the
115 // corrupted data block.
116 //
117 func (v *UnixVolume) Read(loc string) ([]byte, error) {
118         var f *os.File
119         var err error
120         var nread int
121
122         blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
123
124         f, err = os.Open(blockFilename)
125         if err != nil {
126                 return nil, err
127         }
128
129         var buf = make([]byte, BLOCKSIZE)
130         nread, err = f.Read(buf)
131         if err != nil {
132                 log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
133                 return buf, err
134         }
135
136         // Success!
137         return buf[:nread], nil
138 }
139
140 // Write stores a block of data identified by the locator string
141 // "loc".  It returns nil on success.  If the volume is full, it
142 // returns a FullError.  If the write fails due to some other error,
143 // that error is returned.
144 //
145 func (v *UnixVolume) Write(loc string, block []byte) error {
146         if v.IsFull() {
147                 return FullError
148         }
149         blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
150         if err := os.MkdirAll(blockDir, 0755); err != nil {
151                 log.Printf("%s: could not create directory %s: %s",
152                         loc, blockDir, err)
153                 return err
154         }
155
156         tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
157         if tmperr != nil {
158                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
159                 return tmperr
160         }
161         blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
162
163         if _, err := tmpfile.Write(block); err != nil {
164                 log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
165                 return err
166         }
167         if err := tmpfile.Close(); err != nil {
168                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
169                 os.Remove(tmpfile.Name())
170                 return err
171         }
172         if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
173                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
174                 os.Remove(tmpfile.Name())
175                 return err
176         }
177         return nil
178 }
179
180 // Status returns a VolumeStatus struct describing the volume's
181 // current state.
182 //
183 func (v *UnixVolume) Status() *VolumeStatus {
184         var fs syscall.Statfs_t
185         var devnum uint64
186
187         if fi, err := os.Stat(v.root); err == nil {
188                 devnum = fi.Sys().(*syscall.Stat_t).Dev
189         } else {
190                 log.Printf("%s: os.Stat: %s\n", v, err)
191                 return nil
192         }
193
194         err := syscall.Statfs(v.root, &fs)
195         if err != nil {
196                 log.Printf("%s: statfs: %s\n", v, err)
197                 return nil
198         }
199         // These calculations match the way df calculates disk usage:
200         // "free" space is measured by fs.Bavail, but "used" space
201         // uses fs.Blocks - fs.Bfree.
202         free := fs.Bavail * uint64(fs.Bsize)
203         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
204         return &VolumeStatus{v.root, devnum, free, used}
205 }
206
207 // Index returns a list of blocks found on this volume which begin with
208 // the specified prefix. If the prefix is an empty string, Index returns
209 // a complete list of blocks.
210 //
211 // The return value is a multiline string (separated by
212 // newlines). Each line is in the format
213 //
214 //     locator+size modification-time
215 //
216 // e.g.:
217 //
218 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
219 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
220 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
221 //
222 func (v *UnixVolume) Index(prefix string) (output string) {
223         filepath.Walk(v.root,
224                 func(path string, info os.FileInfo, err error) error {
225                         // This WalkFunc inspects each path in the volume
226                         // and prints an index line for all files that begin
227                         // with prefix.
228                         if err != nil {
229                                 log.Printf("IndexHandler: %s: walking to %s: %s",
230                                         v, path, err)
231                                 return nil
232                         }
233                         locator := filepath.Base(path)
234                         // Skip directories that do not match prefix.
235                         // We know there is nothing interesting inside.
236                         if info.IsDir() &&
237                                 !strings.HasPrefix(locator, prefix) &&
238                                 !strings.HasPrefix(prefix, locator) {
239                                 return filepath.SkipDir
240                         }
241                         // Skip any file that is not apparently a locator, e.g. .meta files
242                         if !IsValidLocator(locator) {
243                                 return nil
244                         }
245                         // Print filenames beginning with prefix
246                         if !info.IsDir() && strings.HasPrefix(locator, prefix) {
247                                 output = output + fmt.Sprintf(
248                                         "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
249                         }
250                         return nil
251                 })
252
253         return
254 }
255
256 // IsFull returns true if the free space on the volume is less than
257 // MIN_FREE_KILOBYTES.
258 //
259 func (v *UnixVolume) IsFull() (isFull bool) {
260         fullSymlink := v.root + "/full"
261
262         // Check if the volume has been marked as full in the last hour.
263         if link, err := os.Readlink(fullSymlink); err == nil {
264                 if ts, err := strconv.Atoi(link); err == nil {
265                         fulltime := time.Unix(int64(ts), 0)
266                         if time.Since(fulltime).Hours() < 1.0 {
267                                 return true
268                         }
269                 }
270         }
271
272         if avail, err := v.FreeDiskSpace(); err == nil {
273                 isFull = avail < MIN_FREE_KILOBYTES
274         } else {
275                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
276                 isFull = false
277         }
278
279         // If the volume is full, timestamp it.
280         if isFull {
281                 now := fmt.Sprintf("%d", time.Now().Unix())
282                 os.Symlink(now, fullSymlink)
283         }
284         return
285 }
286
287 // FreeDiskSpace returns the number of unused 1k blocks available on
288 // the volume.
289 //
290 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
291         var fs syscall.Statfs_t
292         err = syscall.Statfs(v.root, &fs)
293         if err == nil {
294                 // Statfs output is not guaranteed to measure free
295                 // space in terms of 1K blocks.
296                 free = fs.Bavail * uint64(fs.Bsize) / 1024
297         }
298         return
299 }
300
301 func (v *UnixVolume) String() string {
302         return fmt.Sprintf("[UnixVolume %s]", v.root)
303 }