Adds checking for request body options that are both valid JSON and readable files
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if trashLifetime != 0 {
27                 return ErrNotImplemented
28         }
29         if dirs := strings.Split(value, ","); len(dirs) > 1 {
30                 log.Print("DEPRECATED: using comma-separated volume list.")
31                 for _, dir := range dirs {
32                         if err := vs.Set(dir); err != nil {
33                                 return err
34                         }
35                 }
36                 return nil
37         }
38         if len(value) == 0 || value[0] != '/' {
39                 return errors.New("Invalid volume: must begin with '/'.")
40         }
41         if _, err := os.Stat(value); err != nil {
42                 return err
43         }
44         var locker sync.Locker
45         if flagSerializeIO {
46                 locker = &sync.Mutex{}
47         }
48         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
49                 root:     value,
50                 locker:   locker,
51                 readonly: flagReadonly,
52         })
53         return nil
54 }
55
56 func init() {
57         flag.Var(
58                 &unixVolumeAdder{&volumes},
59                 "volumes",
60                 "Deprecated synonym for -volume.")
61         flag.Var(
62                 &unixVolumeAdder{&volumes},
63                 "volume",
64                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
65 }
66
67 // Discover adds a UnixVolume for every directory named "keep" that is
68 // located at the top level of a device- or tmpfs-backed mount point
69 // other than "/". It returns the number of volumes added.
70 func (vs *unixVolumeAdder) Discover() int {
71         added := 0
72         f, err := os.Open(ProcMounts)
73         if err != nil {
74                 log.Fatalf("opening %s: %s", ProcMounts, err)
75         }
76         scanner := bufio.NewScanner(f)
77         for scanner.Scan() {
78                 args := strings.Fields(scanner.Text())
79                 if err := scanner.Err(); err != nil {
80                         log.Fatalf("reading %s: %s", ProcMounts, err)
81                 }
82                 dev, mount := args[0], args[1]
83                 if mount == "/" {
84                         continue
85                 }
86                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
87                         continue
88                 }
89                 keepdir := mount + "/keep"
90                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
91                         continue
92                 }
93                 // Set the -readonly flag (but only for this volume)
94                 // if the filesystem is mounted readonly.
95                 flagReadonlyWas := flagReadonly
96                 for _, fsopt := range strings.Split(args[3], ",") {
97                         if fsopt == "ro" {
98                                 flagReadonly = true
99                                 break
100                         }
101                         if fsopt == "rw" {
102                                 break
103                         }
104                 }
105                 if err := vs.Set(keepdir); err != nil {
106                         log.Printf("adding %q: %s", keepdir, err)
107                 } else {
108                         added++
109                 }
110                 flagReadonly = flagReadonlyWas
111         }
112         return added
113 }
114
115 // A UnixVolume stores and retrieves blocks in a local directory.
116 type UnixVolume struct {
117         // path to the volume's root directory
118         root string
119         // something to lock during IO, typically a sync.Mutex (or nil
120         // to skip locking)
121         locker   sync.Locker
122         readonly bool
123 }
124
125 // Touch sets the timestamp for the given locator to the current time
126 func (v *UnixVolume) Touch(loc string) error {
127         if v.readonly {
128                 return MethodDisabledError
129         }
130         p := v.blockPath(loc)
131         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
132         if err != nil {
133                 return err
134         }
135         defer f.Close()
136         if v.locker != nil {
137                 v.locker.Lock()
138                 defer v.locker.Unlock()
139         }
140         if e := lockfile(f); e != nil {
141                 return e
142         }
143         defer unlockfile(f)
144         now := time.Now().Unix()
145         utime := syscall.Utimbuf{now, now}
146         return syscall.Utime(p, &utime)
147 }
148
149 // Mtime returns the stored timestamp for the given locator.
150 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
151         p := v.blockPath(loc)
152         fi, err := os.Stat(p)
153         if err != nil {
154                 return time.Time{}, err
155         }
156         return fi.ModTime(), nil
157 }
158
159 // Lock the locker (if one is in use), open the file for reading, and
160 // call the given function if and when the file is ready to read.
161 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
162         if v.locker != nil {
163                 v.locker.Lock()
164                 defer v.locker.Unlock()
165         }
166         f, err := os.Open(path)
167         if err != nil {
168                 return err
169         }
170         defer f.Close()
171         return fn(f)
172 }
173
174 // stat is os.Stat() with some extra sanity checks.
175 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
176         stat, err := os.Stat(path)
177         if err == nil {
178                 if stat.Size() < 0 {
179                         err = os.ErrInvalid
180                 } else if stat.Size() > BlockSize {
181                         err = TooLongError
182                 }
183         }
184         return stat, err
185 }
186
187 // Get retrieves a block identified by the locator string "loc", and
188 // returns its contents as a byte slice.
189 //
190 // Get returns a nil buffer IFF it returns a non-nil error.
191 func (v *UnixVolume) Get(loc string) ([]byte, error) {
192         path := v.blockPath(loc)
193         stat, err := v.stat(path)
194         if err != nil {
195                 return nil, v.translateError(err)
196         }
197         buf := bufs.Get(int(stat.Size()))
198         err = v.getFunc(path, func(rdr io.Reader) error {
199                 _, err = io.ReadFull(rdr, buf)
200                 return err
201         })
202         if err != nil {
203                 bufs.Put(buf)
204                 return nil, err
205         }
206         return buf, nil
207 }
208
209 // Compare returns nil if Get(loc) would return the same content as
210 // expect. It is functionally equivalent to Get() followed by
211 // bytes.Compare(), but uses less memory.
212 func (v *UnixVolume) Compare(loc string, expect []byte) error {
213         path := v.blockPath(loc)
214         if _, err := v.stat(path); err != nil {
215                 return v.translateError(err)
216         }
217         return v.getFunc(path, func(rdr io.Reader) error {
218                 return compareReaderWithBuf(rdr, expect, loc[:32])
219         })
220 }
221
222 // Put stores a block of data identified by the locator string
223 // "loc".  It returns nil on success.  If the volume is full, it
224 // returns a FullError.  If the write fails due to some other error,
225 // that error is returned.
226 func (v *UnixVolume) Put(loc string, block []byte) error {
227         if v.readonly {
228                 return MethodDisabledError
229         }
230         if v.IsFull() {
231                 return FullError
232         }
233         bdir := v.blockDir(loc)
234         if err := os.MkdirAll(bdir, 0755); err != nil {
235                 log.Printf("%s: could not create directory %s: %s",
236                         loc, bdir, err)
237                 return err
238         }
239
240         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
241         if tmperr != nil {
242                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
243                 return tmperr
244         }
245         bpath := v.blockPath(loc)
246
247         if v.locker != nil {
248                 v.locker.Lock()
249                 defer v.locker.Unlock()
250         }
251         if _, err := tmpfile.Write(block); err != nil {
252                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
253                 tmpfile.Close()
254                 os.Remove(tmpfile.Name())
255                 return err
256         }
257         if err := tmpfile.Close(); err != nil {
258                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
259                 os.Remove(tmpfile.Name())
260                 return err
261         }
262         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
263                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
264                 os.Remove(tmpfile.Name())
265                 return err
266         }
267         return nil
268 }
269
270 // Status returns a VolumeStatus struct describing the volume's
271 // current state, or nil if an error occurs.
272 //
273 func (v *UnixVolume) Status() *VolumeStatus {
274         var fs syscall.Statfs_t
275         var devnum uint64
276
277         if fi, err := os.Stat(v.root); err == nil {
278                 devnum = fi.Sys().(*syscall.Stat_t).Dev
279         } else {
280                 log.Printf("%s: os.Stat: %s\n", v, err)
281                 return nil
282         }
283
284         err := syscall.Statfs(v.root, &fs)
285         if err != nil {
286                 log.Printf("%s: statfs: %s\n", v, err)
287                 return nil
288         }
289         // These calculations match the way df calculates disk usage:
290         // "free" space is measured by fs.Bavail, but "used" space
291         // uses fs.Blocks - fs.Bfree.
292         free := fs.Bavail * uint64(fs.Bsize)
293         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
294         return &VolumeStatus{v.root, devnum, free, used}
295 }
296
297 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
298 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
299
300 // IndexTo writes (to the given Writer) a list of blocks found on this
301 // volume which begin with the specified prefix. If the prefix is an
302 // empty string, IndexTo writes a complete list of blocks.
303 //
304 // Each block is given in the format
305 //
306 //     locator+size modification-time {newline}
307 //
308 // e.g.:
309 //
310 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
311 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
312 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
313 //
314 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
315         var lastErr error = nil
316         rootdir, err := os.Open(v.root)
317         if err != nil {
318                 return err
319         }
320         defer rootdir.Close()
321         for {
322                 names, err := rootdir.Readdirnames(1)
323                 if err == io.EOF {
324                         return lastErr
325                 } else if err != nil {
326                         return err
327                 }
328                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
329                         // prefix excludes all blocks stored in this dir
330                         continue
331                 }
332                 if !blockDirRe.MatchString(names[0]) {
333                         continue
334                 }
335                 blockdirpath := filepath.Join(v.root, names[0])
336                 blockdir, err := os.Open(blockdirpath)
337                 if err != nil {
338                         log.Print("Error reading ", blockdirpath, ": ", err)
339                         lastErr = err
340                         continue
341                 }
342                 for {
343                         fileInfo, err := blockdir.Readdir(1)
344                         if err == io.EOF {
345                                 break
346                         } else if err != nil {
347                                 log.Print("Error reading ", blockdirpath, ": ", err)
348                                 lastErr = err
349                                 break
350                         }
351                         name := fileInfo[0].Name()
352                         if !strings.HasPrefix(name, prefix) {
353                                 continue
354                         }
355                         if !blockFileRe.MatchString(name) {
356                                 continue
357                         }
358                         _, err = fmt.Fprint(w,
359                                 name,
360                                 "+", fileInfo[0].Size(),
361                                 " ", fileInfo[0].ModTime().Unix(),
362                                 "\n")
363                 }
364                 blockdir.Close()
365         }
366 }
367
368 // Delete deletes the block data from the unix storage
369 func (v *UnixVolume) Trash(loc string) error {
370         // Touch() must be called before calling Write() on a block.  Touch()
371         // also uses lockfile().  This avoids a race condition between Write()
372         // and Delete() because either (a) the file will be deleted and Touch()
373         // will signal to the caller that the file is not present (and needs to
374         // be re-written), or (b) Touch() will update the file's timestamp and
375         // Delete() will read the correct up-to-date timestamp and choose not to
376         // delete the file.
377
378         if v.readonly {
379                 return MethodDisabledError
380         }
381         if trashLifetime != 0 {
382                 return ErrNotImplemented
383         }
384         if v.locker != nil {
385                 v.locker.Lock()
386                 defer v.locker.Unlock()
387         }
388         p := v.blockPath(loc)
389         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
390         if err != nil {
391                 return err
392         }
393         defer f.Close()
394         if e := lockfile(f); e != nil {
395                 return e
396         }
397         defer unlockfile(f)
398
399         // If the block has been PUT in the last blobSignatureTTL
400         // seconds, return success without removing the block. This
401         // protects data from garbage collection until it is no longer
402         // possible for clients to retrieve the unreferenced blocks
403         // anyway (because the permission signatures have expired).
404         if fi, err := os.Stat(p); err != nil {
405                 return err
406         } else {
407                 if time.Since(fi.ModTime()) < blobSignatureTTL {
408                         return nil
409                 }
410         }
411         return os.Remove(p)
412 }
413
414 // Untrash moves block from trash back into store
415 // TBD
416 func (v *UnixVolume) Untrash(loc string) error {
417         return ErrNotImplemented
418 }
419
420 // blockDir returns the fully qualified directory name for the directory
421 // where loc is (or would be) stored on this volume.
422 func (v *UnixVolume) blockDir(loc string) string {
423         return filepath.Join(v.root, loc[0:3])
424 }
425
426 // blockPath returns the fully qualified pathname for the path to loc
427 // on this volume.
428 func (v *UnixVolume) blockPath(loc string) string {
429         return filepath.Join(v.blockDir(loc), loc)
430 }
431
432 // IsFull returns true if the free space on the volume is less than
433 // MinFreeKilobytes.
434 //
435 func (v *UnixVolume) IsFull() (isFull bool) {
436         fullSymlink := v.root + "/full"
437
438         // Check if the volume has been marked as full in the last hour.
439         if link, err := os.Readlink(fullSymlink); err == nil {
440                 if ts, err := strconv.Atoi(link); err == nil {
441                         fulltime := time.Unix(int64(ts), 0)
442                         if time.Since(fulltime).Hours() < 1.0 {
443                                 return true
444                         }
445                 }
446         }
447
448         if avail, err := v.FreeDiskSpace(); err == nil {
449                 isFull = avail < MinFreeKilobytes
450         } else {
451                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
452                 isFull = false
453         }
454
455         // If the volume is full, timestamp it.
456         if isFull {
457                 now := fmt.Sprintf("%d", time.Now().Unix())
458                 os.Symlink(now, fullSymlink)
459         }
460         return
461 }
462
463 // FreeDiskSpace returns the number of unused 1k blocks available on
464 // the volume.
465 //
466 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
467         var fs syscall.Statfs_t
468         err = syscall.Statfs(v.root, &fs)
469         if err == nil {
470                 // Statfs output is not guaranteed to measure free
471                 // space in terms of 1K blocks.
472                 free = fs.Bavail * uint64(fs.Bsize) / 1024
473         }
474         return
475 }
476
477 func (v *UnixVolume) String() string {
478         return fmt.Sprintf("[UnixVolume %s]", v.root)
479 }
480
481 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
482 func (v *UnixVolume) Writable() bool {
483         return !v.readonly
484 }
485
486 func (v *UnixVolume) Replication() int {
487         return 1
488 }
489
490 // lockfile and unlockfile use flock(2) to manage kernel file locks.
491 func lockfile(f *os.File) error {
492         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
493 }
494
495 func unlockfile(f *os.File) error {
496         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
497 }
498
499 // Where appropriate, translate a more specific filesystem error to an
500 // error recognized by handlers, like os.ErrNotExist.
501 func (v *UnixVolume) translateError(err error) error {
502         switch err.(type) {
503         case *os.PathError:
504                 // stat() returns a PathError if the parent directory
505                 // (not just the file itself) is missing
506                 return os.ErrNotExist
507         default:
508                 return err
509         }
510 }