7241: Accept command line flags for Azure blob volumes.
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "os"
13         "path/filepath"
14         "regexp"
15         "strconv"
16         "strings"
17         "sync"
18         "syscall"
19         "time"
20 )
21
22 type unixVolumeAdder struct {
23         *volumeSet
24 }
25
26 func (vs *unixVolumeAdder) Set(value string) error {
27         if dirs := strings.Split(value, ","); len(dirs) > 1 {
28                 log.Print("DEPRECATED: using comma-separated volume list.")
29                 for _, dir := range dirs {
30                         if err := vs.Set(dir); err != nil {
31                                 return err
32                         }
33                 }
34                 return nil
35         }
36         if len(value) == 0 || value[0] != '/' {
37                 return errors.New("Invalid volume: must begin with '/'.")
38         }
39         if _, err := os.Stat(value); err != nil {
40                 return err
41         }
42         var locker sync.Locker
43         if flagSerializeIO {
44                 locker = &sync.Mutex{}
45         }
46         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
47                 root:     value,
48                 locker:   locker,
49                 readonly: flagReadonly,
50         })
51         return nil
52 }
53
54 func init() {
55         flag.Var(
56                 &unixVolumeAdder{&volumes},
57                 "volumes",
58                 "Deprecated synonym for -volume.")
59         flag.Var(
60                 &unixVolumeAdder{&volumes},
61                 "volume",
62                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
63 }
64
65 // Discover adds a UnixVolume for every directory named "keep" that is
66 // located at the top level of a device- or tmpfs-backed mount point
67 // other than "/". It returns the number of volumes added.
68 func (vs *unixVolumeAdder) Discover() int {
69         added := 0
70         f, err := os.Open(ProcMounts)
71         if err != nil {
72                 log.Fatalf("opening %s: %s", ProcMounts, err)
73         }
74         scanner := bufio.NewScanner(f)
75         for scanner.Scan() {
76                 args := strings.Fields(scanner.Text())
77                 if err := scanner.Err(); err != nil {
78                         log.Fatalf("reading %s: %s", ProcMounts, err)
79                 }
80                 dev, mount := args[0], args[1]
81                 if mount == "/" {
82                         continue
83                 }
84                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
85                         continue
86                 }
87                 keepdir := mount + "/keep"
88                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
89                         continue
90                 }
91                 // Set the -readonly flag (but only for this volume)
92                 // if the filesystem is mounted readonly.
93                 flagReadonlyWas := flagReadonly
94                 for _, fsopt := range strings.Split(args[3], ",") {
95                         if fsopt == "ro" {
96                                 flagReadonly = true
97                                 break
98                         }
99                         if fsopt == "rw" {
100                                 break
101                         }
102                 }
103                 if err := vs.Set(keepdir); err != nil {
104                         log.Printf("adding %q: %s", keepdir, err)
105                 } else {
106                         added++
107                 }
108                 flagReadonly = flagReadonlyWas
109         }
110         return added
111 }
112
113 // A UnixVolume stores and retrieves blocks in a local directory.
114 type UnixVolume struct {
115         // path to the volume's root directory
116         root string
117         // something to lock during IO, typically a sync.Mutex (or nil
118         // to skip locking)
119         locker   sync.Locker
120         readonly bool
121 }
122
123 // Touch sets the timestamp for the given locator to the current time
124 func (v *UnixVolume) Touch(loc string) error {
125         if v.readonly {
126                 return MethodDisabledError
127         }
128         p := v.blockPath(loc)
129         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
130         if err != nil {
131                 return err
132         }
133         defer f.Close()
134         if v.locker != nil {
135                 v.locker.Lock()
136                 defer v.locker.Unlock()
137         }
138         if e := lockfile(f); e != nil {
139                 return e
140         }
141         defer unlockfile(f)
142         now := time.Now().Unix()
143         utime := syscall.Utimbuf{now, now}
144         return syscall.Utime(p, &utime)
145 }
146
147 // Mtime returns the stored timestamp for the given locator.
148 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
149         p := v.blockPath(loc)
150         fi, err := os.Stat(p)
151         if err != nil {
152                 return time.Time{}, err
153         }
154         return fi.ModTime(), nil
155 }
156
157 // Lock the locker (if one is in use), open the file for reading, and
158 // call the given function if and when the file is ready to read.
159 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
160         if v.locker != nil {
161                 v.locker.Lock()
162                 defer v.locker.Unlock()
163         }
164         f, err := os.Open(path)
165         if err != nil {
166                 return err
167         }
168         defer f.Close()
169         return fn(f)
170 }
171
172 // stat is os.Stat() with some extra sanity checks.
173 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
174         stat, err := os.Stat(path)
175         if err == nil {
176                 if stat.Size() < 0 {
177                         err = os.ErrInvalid
178                 } else if stat.Size() > BlockSize {
179                         err = TooLongError
180                 }
181         }
182         return stat, err
183 }
184
185 // Get retrieves a block identified by the locator string "loc", and
186 // returns its contents as a byte slice.
187 //
188 // Get returns a nil buffer IFF it returns a non-nil error.
189 func (v *UnixVolume) Get(loc string) ([]byte, error) {
190         path := v.blockPath(loc)
191         stat, err := v.stat(path)
192         if err != nil {
193                 return nil, err
194         }
195         buf := bufs.Get(int(stat.Size()))
196         err = v.getFunc(path, func(rdr io.Reader) error {
197                 _, err = io.ReadFull(rdr, buf)
198                 return err
199         })
200         if err != nil {
201                 bufs.Put(buf)
202                 return nil, err
203         }
204         return buf, nil
205 }
206
207 // Compare returns nil if Get(loc) would return the same content as
208 // expect. It is functionally equivalent to Get() followed by
209 // bytes.Compare(), but uses less memory.
210 func (v *UnixVolume) Compare(loc string, expect []byte) error {
211         path := v.blockPath(loc)
212         stat, err := v.stat(path)
213         if err != nil {
214                 return err
215         }
216         bufLen := 1 << 20
217         if int64(bufLen) > stat.Size() {
218                 bufLen = int(stat.Size())
219                 if bufLen < 1 {
220                         // len(buf)==0 would prevent us from handling
221                         // empty files the same way as non-empty
222                         // files, because reading 0 bytes at a time
223                         // never reaches EOF.
224                         bufLen = 1
225                 }
226         }
227         cmp := expect
228         buf := make([]byte, bufLen)
229         return v.getFunc(path, func(rdr io.Reader) error {
230                 // Loop invariants: all data read so far matched what
231                 // we expected, and the first N bytes of cmp are
232                 // expected to equal the next N bytes read from
233                 // reader.
234                 for {
235                         n, err := rdr.Read(buf)
236                         if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
237                                 return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
238                         }
239                         cmp = cmp[n:]
240                         if err == io.EOF {
241                                 if len(cmp) != 0 {
242                                         return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
243                                 }
244                                 return nil
245                         } else if err != nil {
246                                 return err
247                         }
248                 }
249         })
250 }
251
252 // Put stores a block of data identified by the locator string
253 // "loc".  It returns nil on success.  If the volume is full, it
254 // returns a FullError.  If the write fails due to some other error,
255 // that error is returned.
256 func (v *UnixVolume) Put(loc string, block []byte) error {
257         if v.readonly {
258                 return MethodDisabledError
259         }
260         if v.IsFull() {
261                 return FullError
262         }
263         bdir := v.blockDir(loc)
264         if err := os.MkdirAll(bdir, 0755); err != nil {
265                 log.Printf("%s: could not create directory %s: %s",
266                         loc, bdir, err)
267                 return err
268         }
269
270         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
271         if tmperr != nil {
272                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
273                 return tmperr
274         }
275         bpath := v.blockPath(loc)
276
277         if v.locker != nil {
278                 v.locker.Lock()
279                 defer v.locker.Unlock()
280         }
281         if _, err := tmpfile.Write(block); err != nil {
282                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
283                 tmpfile.Close()
284                 os.Remove(tmpfile.Name())
285                 return err
286         }
287         if err := tmpfile.Close(); err != nil {
288                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
289                 os.Remove(tmpfile.Name())
290                 return err
291         }
292         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
293                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
294                 os.Remove(tmpfile.Name())
295                 return err
296         }
297         return nil
298 }
299
300 // Status returns a VolumeStatus struct describing the volume's
301 // current state, or nil if an error occurs.
302 //
303 func (v *UnixVolume) Status() *VolumeStatus {
304         var fs syscall.Statfs_t
305         var devnum uint64
306
307         if fi, err := os.Stat(v.root); err == nil {
308                 devnum = fi.Sys().(*syscall.Stat_t).Dev
309         } else {
310                 log.Printf("%s: os.Stat: %s\n", v, err)
311                 return nil
312         }
313
314         err := syscall.Statfs(v.root, &fs)
315         if err != nil {
316                 log.Printf("%s: statfs: %s\n", v, err)
317                 return nil
318         }
319         // These calculations match the way df calculates disk usage:
320         // "free" space is measured by fs.Bavail, but "used" space
321         // uses fs.Blocks - fs.Bfree.
322         free := fs.Bavail * uint64(fs.Bsize)
323         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
324         return &VolumeStatus{v.root, devnum, free, used}
325 }
326
327 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
328
329 // IndexTo writes (to the given Writer) a list of blocks found on this
330 // volume which begin with the specified prefix. If the prefix is an
331 // empty string, IndexTo writes a complete list of blocks.
332 //
333 // Each block is given in the format
334 //
335 //     locator+size modification-time {newline}
336 //
337 // e.g.:
338 //
339 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
340 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
341 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
342 //
343 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
344         var lastErr error = nil
345         rootdir, err := os.Open(v.root)
346         if err != nil {
347                 return err
348         }
349         defer rootdir.Close()
350         for {
351                 names, err := rootdir.Readdirnames(1)
352                 if err == io.EOF {
353                         return lastErr
354                 } else if err != nil {
355                         return err
356                 }
357                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
358                         // prefix excludes all blocks stored in this dir
359                         continue
360                 }
361                 if !blockDirRe.MatchString(names[0]) {
362                         continue
363                 }
364                 blockdirpath := filepath.Join(v.root, names[0])
365                 blockdir, err := os.Open(blockdirpath)
366                 if err != nil {
367                         log.Print("Error reading ", blockdirpath, ": ", err)
368                         lastErr = err
369                         continue
370                 }
371                 for {
372                         fileInfo, err := blockdir.Readdir(1)
373                         if err == io.EOF {
374                                 break
375                         } else if err != nil {
376                                 log.Print("Error reading ", blockdirpath, ": ", err)
377                                 lastErr = err
378                                 break
379                         }
380                         name := fileInfo[0].Name()
381                         if !strings.HasPrefix(name, prefix) {
382                                 continue
383                         }
384                         _, err = fmt.Fprint(w,
385                                 name,
386                                 "+", fileInfo[0].Size(),
387                                 " ", fileInfo[0].ModTime().Unix(),
388                                 "\n")
389                 }
390                 blockdir.Close()
391         }
392 }
393
394 // Delete deletes the block data from the unix storage
395 func (v *UnixVolume) Delete(loc string) error {
396         // Touch() must be called before calling Write() on a block.  Touch()
397         // also uses lockfile().  This avoids a race condition between Write()
398         // and Delete() because either (a) the file will be deleted and Touch()
399         // will signal to the caller that the file is not present (and needs to
400         // be re-written), or (b) Touch() will update the file's timestamp and
401         // Delete() will read the correct up-to-date timestamp and choose not to
402         // delete the file.
403
404         if v.readonly {
405                 return MethodDisabledError
406         }
407         if v.locker != nil {
408                 v.locker.Lock()
409                 defer v.locker.Unlock()
410         }
411         p := v.blockPath(loc)
412         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
413         if err != nil {
414                 return err
415         }
416         defer f.Close()
417         if e := lockfile(f); e != nil {
418                 return e
419         }
420         defer unlockfile(f)
421
422         // If the block has been PUT in the last blobSignatureTTL
423         // seconds, return success without removing the block. This
424         // protects data from garbage collection until it is no longer
425         // possible for clients to retrieve the unreferenced blocks
426         // anyway (because the permission signatures have expired).
427         if fi, err := os.Stat(p); err != nil {
428                 return err
429         } else {
430                 if time.Since(fi.ModTime()) < blobSignatureTTL {
431                         return nil
432                 }
433         }
434         return os.Remove(p)
435 }
436
437 // blockDir returns the fully qualified directory name for the directory
438 // where loc is (or would be) stored on this volume.
439 func (v *UnixVolume) blockDir(loc string) string {
440         return filepath.Join(v.root, loc[0:3])
441 }
442
443 // blockPath returns the fully qualified pathname for the path to loc
444 // on this volume.
445 func (v *UnixVolume) blockPath(loc string) string {
446         return filepath.Join(v.blockDir(loc), loc)
447 }
448
449 // IsFull returns true if the free space on the volume is less than
450 // MinFreeKilobytes.
451 //
452 func (v *UnixVolume) IsFull() (isFull bool) {
453         fullSymlink := v.root + "/full"
454
455         // Check if the volume has been marked as full in the last hour.
456         if link, err := os.Readlink(fullSymlink); err == nil {
457                 if ts, err := strconv.Atoi(link); err == nil {
458                         fulltime := time.Unix(int64(ts), 0)
459                         if time.Since(fulltime).Hours() < 1.0 {
460                                 return true
461                         }
462                 }
463         }
464
465         if avail, err := v.FreeDiskSpace(); err == nil {
466                 isFull = avail < MinFreeKilobytes
467         } else {
468                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
469                 isFull = false
470         }
471
472         // If the volume is full, timestamp it.
473         if isFull {
474                 now := fmt.Sprintf("%d", time.Now().Unix())
475                 os.Symlink(now, fullSymlink)
476         }
477         return
478 }
479
480 // FreeDiskSpace returns the number of unused 1k blocks available on
481 // the volume.
482 //
483 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
484         var fs syscall.Statfs_t
485         err = syscall.Statfs(v.root, &fs)
486         if err == nil {
487                 // Statfs output is not guaranteed to measure free
488                 // space in terms of 1K blocks.
489                 free = fs.Bavail * uint64(fs.Bsize) / 1024
490         }
491         return
492 }
493
494 func (v *UnixVolume) String() string {
495         return fmt.Sprintf("[UnixVolume %s]", v.root)
496 }
497
498 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
499 func (v *UnixVolume) Writable() bool {
500         return !v.readonly
501 }
502
503 // lockfile and unlockfile use flock(2) to manage kernel file locks.
504 func lockfile(f *os.File) error {
505         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
506 }
507
508 func unlockfile(f *os.File) error {
509         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
510 }