Make sure an IOHandler is running when -serialize is on.
[arvados.git] / services / keep / src / keep / volume.go
1 // A Volume is an interface representing a Keep back-end storage unit:
2 // for example, a single mounted disk, a RAID array, an Amazon S3 volume,
3 // etc.
4 //
5 // A UnixVolume is a Volume that is backed by a locally mounted disk.
6
7 package main
8
9 import (
10         "fmt"
11         "io/ioutil"
12         "log"
13         "os"
14         "path/filepath"
15         "strconv"
16         "strings"
17         "syscall"
18         "time"
19 )
20
21 type Volume interface {
22         Get(loc string) ([]byte, error)
23         Put(loc string, block []byte) error
24         Index(prefix string) string
25         Status() *VolumeStatus
26         String() string
27 }
28
29 // IORequests are encapsulated requests to perform I/O on a Keep volume.
30 // When running in serialized mode, the Keep front end sends IORequests
31 // on a channel to an IORunner, which handles them one at a time and
32 // returns an IOResponse.
33 //
34 type IOMethod int
35
36 const (
37         KeepGet IOMethod = iota
38         KeepPut
39 )
40
41 type IORequest struct {
42         method IOMethod
43         loc    string
44         data   []byte
45         reply  chan *IOResponse
46 }
47
48 type IOResponse struct {
49         data []byte
50         err  error
51 }
52
53 // A UnixVolume is configured with:
54 //
55 // * root: the path to the volume's root directory
56 // * queue: if non-nil, all I/O requests for this volume should be queued
57 //   on this channel. The response will be delivered on the IOResponse
58 //   channel included in the request.
59 //
60 type UnixVolume struct {
61         root  string // path to this volume
62         queue chan *IORequest
63 }
64
65 func (v *UnixVolume) IOHandler() {
66         for req := range v.queue {
67                 var result IOResponse
68                 switch req.method {
69                 case KeepGet:
70                         result.data, result.err = v.Read(req.loc)
71                 case KeepPut:
72                         result.err = v.Write(req.loc, req.data)
73                 }
74                 req.reply <- &result
75         }
76 }
77
78 func MakeUnixVolume(root string, serialize bool) (v UnixVolume) {
79         if serialize {
80                 v = UnixVolume{root, make(chan *IORequest)}
81                 go v.IOHandler()
82         } else {
83                 v = UnixVolume{root, nil}
84         }
85         return
86 }
87
88 func (v *UnixVolume) Get(loc string) ([]byte, error) {
89         if v.queue == nil {
90                 return v.Read(loc)
91         }
92         reply := make(chan *IOResponse)
93         v.queue <- &IORequest{KeepGet, loc, nil, reply}
94         response := <-reply
95         return response.data, response.err
96 }
97
98 func (v *UnixVolume) Put(loc string, block []byte) error {
99         if v.queue == nil {
100                 return v.Write(loc, block)
101         }
102         reply := make(chan *IOResponse)
103         v.queue <- &IORequest{KeepPut, loc, block, reply}
104         response := <-reply
105         return response.err
106 }
107
108 // Read retrieves a block identified by the locator string "loc", and
109 // returns its contents as a byte slice.
110 //
111 // If the block could not be opened or read, Read returns a nil slice
112 // and the os.Error that was generated.
113 //
114 // If the block is present but its content hash does not match loc,
115 // Read returns the block and a CorruptError.  It is the caller's
116 // responsibility to decide what (if anything) to do with the
117 // corrupted data block.
118 //
119 func (v *UnixVolume) Read(loc string) ([]byte, error) {
120         var f *os.File
121         var err error
122         var nread int
123
124         blockFilename := fmt.Sprintf("%s/%s/%s", v.root, loc[0:3], loc)
125
126         f, err = os.Open(blockFilename)
127         if err != nil {
128                 return nil, err
129         }
130
131         var buf = make([]byte, BLOCKSIZE)
132         nread, err = f.Read(buf)
133         if err != nil {
134                 log.Printf("%s: reading %s: %s\n", v, blockFilename, err)
135                 return buf, err
136         }
137
138         // Success!
139         return buf[:nread], nil
140 }
141
142 // Write stores a block of data identified by the locator string
143 // "loc".  It returns nil on success.  If the volume is full, it
144 // returns a FullError.  If the write fails due to some other error,
145 // that error is returned.
146 //
147 func (v *UnixVolume) Write(loc string, block []byte) error {
148         if v.IsFull() {
149                 return FullError
150         }
151         blockDir := fmt.Sprintf("%s/%s", v.root, loc[0:3])
152         if err := os.MkdirAll(blockDir, 0755); err != nil {
153                 log.Printf("%s: could not create directory %s: %s",
154                         loc, blockDir, err)
155                 return err
156         }
157
158         tmpfile, tmperr := ioutil.TempFile(blockDir, "tmp"+loc)
159         if tmperr != nil {
160                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", blockDir, loc, tmperr)
161                 return tmperr
162         }
163         blockFilename := fmt.Sprintf("%s/%s", blockDir, loc)
164
165         if _, err := tmpfile.Write(block); err != nil {
166                 log.Printf("%s: writing to %s: %s\n", v, blockFilename, err)
167                 return err
168         }
169         if err := tmpfile.Close(); err != nil {
170                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
171                 os.Remove(tmpfile.Name())
172                 return err
173         }
174         if err := os.Rename(tmpfile.Name(), blockFilename); err != nil {
175                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), blockFilename, err)
176                 os.Remove(tmpfile.Name())
177                 return err
178         }
179         return nil
180 }
181
182 // Status returns a VolumeStatus struct describing the volume's
183 // current state.
184 //
185 func (v *UnixVolume) Status() *VolumeStatus {
186         var fs syscall.Statfs_t
187         var devnum uint64
188
189         if fi, err := os.Stat(v.root); err == nil {
190                 devnum = fi.Sys().(*syscall.Stat_t).Dev
191         } else {
192                 log.Printf("%s: os.Stat: %s\n", v, err)
193                 return nil
194         }
195
196         err := syscall.Statfs(v.root, &fs)
197         if err != nil {
198                 log.Printf("%s: statfs: %s\n", v, err)
199                 return nil
200         }
201         // These calculations match the way df calculates disk usage:
202         // "free" space is measured by fs.Bavail, but "used" space
203         // uses fs.Blocks - fs.Bfree.
204         free := fs.Bavail * uint64(fs.Bsize)
205         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
206         return &VolumeStatus{v.root, devnum, free, used}
207 }
208
209 // Index returns a list of blocks found on this volume which begin with
210 // the specified prefix. If the prefix is an empty string, Index returns
211 // a complete list of blocks.
212 //
213 // The return value is a multiline string (separated by
214 // newlines). Each line is in the format
215 //
216 //     locator+size modification-time
217 //
218 // e.g.:
219 //
220 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
221 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
222 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
223 //
224 func (v *UnixVolume) Index(prefix string) (output string) {
225         filepath.Walk(v.root,
226                 func(path string, info os.FileInfo, err error) error {
227                         // This WalkFunc inspects each path in the volume
228                         // and prints an index line for all files that begin
229                         // with prefix.
230                         if err != nil {
231                                 log.Printf("IndexHandler: %s: walking to %s: %s",
232                                         v, path, err)
233                                 return nil
234                         }
235                         locator := filepath.Base(path)
236                         // Skip directories that do not match prefix.
237                         // We know there is nothing interesting inside.
238                         if info.IsDir() &&
239                                 !strings.HasPrefix(locator, prefix) &&
240                                 !strings.HasPrefix(prefix, locator) {
241                                 return filepath.SkipDir
242                         }
243                         // Skip any file that is not apparently a locator, e.g. .meta files
244                         if !IsValidLocator(locator) {
245                                 return nil
246                         }
247                         // Print filenames beginning with prefix
248                         if !info.IsDir() && strings.HasPrefix(locator, prefix) {
249                                 output = output + fmt.Sprintf(
250                                         "%s+%d %d\n", locator, info.Size(), info.ModTime().Unix())
251                         }
252                         return nil
253                 })
254
255         return
256 }
257
258 // IsFull returns true if the free space on the volume is less than
259 // MIN_FREE_KILOBYTES.
260 //
261 func (v *UnixVolume) IsFull() (isFull bool) {
262         fullSymlink := v.root + "/full"
263
264         // Check if the volume has been marked as full in the last hour.
265         if link, err := os.Readlink(fullSymlink); err == nil {
266                 if ts, err := strconv.Atoi(link); err == nil {
267                         fulltime := time.Unix(int64(ts), 0)
268                         if time.Since(fulltime).Hours() < 1.0 {
269                                 return true
270                         }
271                 }
272         }
273
274         if avail, err := v.FreeDiskSpace(); err == nil {
275                 isFull = avail < MIN_FREE_KILOBYTES
276         } else {
277                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
278                 isFull = false
279         }
280
281         // If the volume is full, timestamp it.
282         if isFull {
283                 now := fmt.Sprintf("%d", time.Now().Unix())
284                 os.Symlink(now, fullSymlink)
285         }
286         return
287 }
288
289 // FreeDiskSpace returns the number of unused 1k blocks available on
290 // the volume.
291 //
292 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
293         var fs syscall.Statfs_t
294         err = syscall.Statfs(v.root, &fs)
295         if err == nil {
296                 // Statfs output is not guaranteed to measure free
297                 // space in terms of 1K blocks.
298                 free = fs.Bavail * uint64(fs.Bsize) / 1024
299         }
300         return
301 }
302
303 func (v *UnixVolume) String() string {
304         return fmt.Sprintf("[UnixVolume %s]", v.root)
305 }