9068: Move buffer allocation from volumes to GetBlockHandler.
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if dirs := strings.Split(value, ","); len(dirs) > 1 {
27                 log.Print("DEPRECATED: using comma-separated volume list.")
28                 for _, dir := range dirs {
29                         if err := vs.Set(dir); err != nil {
30                                 return err
31                         }
32                 }
33                 return nil
34         }
35         if len(value) == 0 || value[0] != '/' {
36                 return errors.New("Invalid volume: must begin with '/'.")
37         }
38         if _, err := os.Stat(value); err != nil {
39                 return err
40         }
41         var locker sync.Locker
42         if flagSerializeIO {
43                 locker = &sync.Mutex{}
44         }
45         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
46                 root:     value,
47                 locker:   locker,
48                 readonly: flagReadonly,
49         })
50         return nil
51 }
52
53 func init() {
54         flag.Var(
55                 &unixVolumeAdder{&volumes},
56                 "volumes",
57                 "Deprecated synonym for -volume.")
58         flag.Var(
59                 &unixVolumeAdder{&volumes},
60                 "volume",
61                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
62 }
63
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
68         added := 0
69         f, err := os.Open(ProcMounts)
70         if err != nil {
71                 log.Fatalf("opening %s: %s", ProcMounts, err)
72         }
73         scanner := bufio.NewScanner(f)
74         for scanner.Scan() {
75                 args := strings.Fields(scanner.Text())
76                 if err := scanner.Err(); err != nil {
77                         log.Fatalf("reading %s: %s", ProcMounts, err)
78                 }
79                 dev, mount := args[0], args[1]
80                 if mount == "/" {
81                         continue
82                 }
83                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
84                         continue
85                 }
86                 keepdir := mount + "/keep"
87                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
88                         continue
89                 }
90                 // Set the -readonly flag (but only for this volume)
91                 // if the filesystem is mounted readonly.
92                 flagReadonlyWas := flagReadonly
93                 for _, fsopt := range strings.Split(args[3], ",") {
94                         if fsopt == "ro" {
95                                 flagReadonly = true
96                                 break
97                         }
98                         if fsopt == "rw" {
99                                 break
100                         }
101                 }
102                 if err := vs.Set(keepdir); err != nil {
103                         log.Printf("adding %q: %s", keepdir, err)
104                 } else {
105                         added++
106                 }
107                 flagReadonly = flagReadonlyWas
108         }
109         return added
110 }
111
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114         // path to the volume's root directory
115         root string
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker   sync.Locker
119         readonly bool
120 }
121
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
124         if v.readonly {
125                 return MethodDisabledError
126         }
127         p := v.blockPath(loc)
128         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
129         if err != nil {
130                 return err
131         }
132         defer f.Close()
133         if v.locker != nil {
134                 v.locker.Lock()
135                 defer v.locker.Unlock()
136         }
137         if e := lockfile(f); e != nil {
138                 return e
139         }
140         defer unlockfile(f)
141         now := time.Now().Unix()
142         utime := syscall.Utimbuf{now, now}
143         return syscall.Utime(p, &utime)
144 }
145
146 // Mtime returns the stored timestamp for the given locator.
147 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
148         p := v.blockPath(loc)
149         fi, err := os.Stat(p)
150         if err != nil {
151                 return time.Time{}, err
152         }
153         return fi.ModTime(), nil
154 }
155
156 // Lock the locker (if one is in use), open the file for reading, and
157 // call the given function if and when the file is ready to read.
158 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
159         if v.locker != nil {
160                 v.locker.Lock()
161                 defer v.locker.Unlock()
162         }
163         f, err := os.Open(path)
164         if err != nil {
165                 return err
166         }
167         defer f.Close()
168         return fn(f)
169 }
170
171 // stat is os.Stat() with some extra sanity checks.
172 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
173         stat, err := os.Stat(path)
174         if err == nil {
175                 if stat.Size() < 0 {
176                         err = os.ErrInvalid
177                 } else if stat.Size() > BlockSize {
178                         err = TooLongError
179                 }
180         }
181         return stat, err
182 }
183
184 // Get retrieves a block, copies it to the given slice, and returns
185 // the number of bytes copied.
186 func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
187         path := v.blockPath(loc)
188         stat, err := v.stat(path)
189         if err != nil {
190                 return 0, v.translateError(err)
191         }
192         if stat.Size() > int64(len(buf)) {
193                 return 0, TooLongError
194         }
195         var read int
196         size := int(stat.Size())
197         err = v.getFunc(path, func(rdr io.Reader) error {
198                 read, err = io.ReadFull(rdr, buf[:size])
199                 return err
200         })
201         return read, err
202 }
203
204 // Compare returns nil if Get(loc) would return the same content as
205 // expect. It is functionally equivalent to Get() followed by
206 // bytes.Compare(), but uses less memory.
207 func (v *UnixVolume) Compare(loc string, expect []byte) error {
208         path := v.blockPath(loc)
209         if _, err := v.stat(path); err != nil {
210                 return v.translateError(err)
211         }
212         return v.getFunc(path, func(rdr io.Reader) error {
213                 return compareReaderWithBuf(rdr, expect, loc[:32])
214         })
215 }
216
217 // Put stores a block of data identified by the locator string
218 // "loc".  It returns nil on success.  If the volume is full, it
219 // returns a FullError.  If the write fails due to some other error,
220 // that error is returned.
221 func (v *UnixVolume) Put(loc string, block []byte) error {
222         if v.readonly {
223                 return MethodDisabledError
224         }
225         if v.IsFull() {
226                 return FullError
227         }
228         bdir := v.blockDir(loc)
229         if err := os.MkdirAll(bdir, 0755); err != nil {
230                 log.Printf("%s: could not create directory %s: %s",
231                         loc, bdir, err)
232                 return err
233         }
234
235         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
236         if tmperr != nil {
237                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
238                 return tmperr
239         }
240         bpath := v.blockPath(loc)
241
242         if v.locker != nil {
243                 v.locker.Lock()
244                 defer v.locker.Unlock()
245         }
246         if _, err := tmpfile.Write(block); err != nil {
247                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
248                 tmpfile.Close()
249                 os.Remove(tmpfile.Name())
250                 return err
251         }
252         if err := tmpfile.Close(); err != nil {
253                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
254                 os.Remove(tmpfile.Name())
255                 return err
256         }
257         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
258                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
259                 os.Remove(tmpfile.Name())
260                 return err
261         }
262         return nil
263 }
264
265 // Status returns a VolumeStatus struct describing the volume's
266 // current state, or nil if an error occurs.
267 //
268 func (v *UnixVolume) Status() *VolumeStatus {
269         var fs syscall.Statfs_t
270         var devnum uint64
271
272         if fi, err := os.Stat(v.root); err == nil {
273                 devnum = fi.Sys().(*syscall.Stat_t).Dev
274         } else {
275                 log.Printf("%s: os.Stat: %s\n", v, err)
276                 return nil
277         }
278
279         err := syscall.Statfs(v.root, &fs)
280         if err != nil {
281                 log.Printf("%s: statfs: %s\n", v, err)
282                 return nil
283         }
284         // These calculations match the way df calculates disk usage:
285         // "free" space is measured by fs.Bavail, but "used" space
286         // uses fs.Blocks - fs.Bfree.
287         free := fs.Bavail * uint64(fs.Bsize)
288         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
289         return &VolumeStatus{v.root, devnum, free, used}
290 }
291
292 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
293 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
294
295 // IndexTo writes (to the given Writer) a list of blocks found on this
296 // volume which begin with the specified prefix. If the prefix is an
297 // empty string, IndexTo writes a complete list of blocks.
298 //
299 // Each block is given in the format
300 //
301 //     locator+size modification-time {newline}
302 //
303 // e.g.:
304 //
305 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
306 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
307 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
308 //
309 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
310         var lastErr error = nil
311         rootdir, err := os.Open(v.root)
312         if err != nil {
313                 return err
314         }
315         defer rootdir.Close()
316         for {
317                 names, err := rootdir.Readdirnames(1)
318                 if err == io.EOF {
319                         return lastErr
320                 } else if err != nil {
321                         return err
322                 }
323                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
324                         // prefix excludes all blocks stored in this dir
325                         continue
326                 }
327                 if !blockDirRe.MatchString(names[0]) {
328                         continue
329                 }
330                 blockdirpath := filepath.Join(v.root, names[0])
331                 blockdir, err := os.Open(blockdirpath)
332                 if err != nil {
333                         log.Print("Error reading ", blockdirpath, ": ", err)
334                         lastErr = err
335                         continue
336                 }
337                 for {
338                         fileInfo, err := blockdir.Readdir(1)
339                         if err == io.EOF {
340                                 break
341                         } else if err != nil {
342                                 log.Print("Error reading ", blockdirpath, ": ", err)
343                                 lastErr = err
344                                 break
345                         }
346                         name := fileInfo[0].Name()
347                         if !strings.HasPrefix(name, prefix) {
348                                 continue
349                         }
350                         if !blockFileRe.MatchString(name) {
351                                 continue
352                         }
353                         _, err = fmt.Fprint(w,
354                                 name,
355                                 "+", fileInfo[0].Size(),
356                                 " ", fileInfo[0].ModTime().Unix(),
357                                 "\n")
358                 }
359                 blockdir.Close()
360         }
361 }
362
363 // Trash trashes the block data from the unix storage
364 // If trashLifetime == 0, the block is deleted
365 // Else, the block is renamed as path/{loc}.trash.{deadline},
366 // where deadline = now + trashLifetime
367 func (v *UnixVolume) Trash(loc string) error {
368         // Touch() must be called before calling Write() on a block.  Touch()
369         // also uses lockfile().  This avoids a race condition between Write()
370         // and Trash() because either (a) the file will be trashed and Touch()
371         // will signal to the caller that the file is not present (and needs to
372         // be re-written), or (b) Touch() will update the file's timestamp and
373         // Trash() will read the correct up-to-date timestamp and choose not to
374         // trash the file.
375
376         if v.readonly {
377                 return MethodDisabledError
378         }
379         if v.locker != nil {
380                 v.locker.Lock()
381                 defer v.locker.Unlock()
382         }
383         p := v.blockPath(loc)
384         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
385         if err != nil {
386                 return err
387         }
388         defer f.Close()
389         if e := lockfile(f); e != nil {
390                 return e
391         }
392         defer unlockfile(f)
393
394         // If the block has been PUT in the last blobSignatureTTL
395         // seconds, return success without removing the block. This
396         // protects data from garbage collection until it is no longer
397         // possible for clients to retrieve the unreferenced blocks
398         // anyway (because the permission signatures have expired).
399         if fi, err := os.Stat(p); err != nil {
400                 return err
401         } else {
402                 if time.Since(fi.ModTime()) < blobSignatureTTL {
403                         return nil
404                 }
405         }
406
407         if trashLifetime == 0 {
408                 return os.Remove(p)
409         }
410         return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
411 }
412
413 // Untrash moves block from trash back into store
414 // Look for path/{loc}.trash.{deadline} in storage,
415 // and rename the first such file as path/{loc}
416 func (v *UnixVolume) Untrash(loc string) (err error) {
417         if v.readonly {
418                 return MethodDisabledError
419         }
420
421         files, err := ioutil.ReadDir(v.blockDir(loc))
422         if err != nil {
423                 return err
424         }
425
426         if len(files) == 0 {
427                 return os.ErrNotExist
428         }
429
430         foundTrash := false
431         prefix := fmt.Sprintf("%v.trash.", loc)
432         for _, f := range files {
433                 if strings.HasPrefix(f.Name(), prefix) {
434                         foundTrash = true
435                         err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
436                         if err == nil {
437                                 break
438                         }
439                 }
440         }
441
442         if foundTrash == false {
443                 return os.ErrNotExist
444         }
445
446         return
447 }
448
449 // blockDir returns the fully qualified directory name for the directory
450 // where loc is (or would be) stored on this volume.
451 func (v *UnixVolume) blockDir(loc string) string {
452         return filepath.Join(v.root, loc[0:3])
453 }
454
455 // blockPath returns the fully qualified pathname for the path to loc
456 // on this volume.
457 func (v *UnixVolume) blockPath(loc string) string {
458         return filepath.Join(v.blockDir(loc), loc)
459 }
460
461 // IsFull returns true if the free space on the volume is less than
462 // MinFreeKilobytes.
463 //
464 func (v *UnixVolume) IsFull() (isFull bool) {
465         fullSymlink := v.root + "/full"
466
467         // Check if the volume has been marked as full in the last hour.
468         if link, err := os.Readlink(fullSymlink); err == nil {
469                 if ts, err := strconv.Atoi(link); err == nil {
470                         fulltime := time.Unix(int64(ts), 0)
471                         if time.Since(fulltime).Hours() < 1.0 {
472                                 return true
473                         }
474                 }
475         }
476
477         if avail, err := v.FreeDiskSpace(); err == nil {
478                 isFull = avail < MinFreeKilobytes
479         } else {
480                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
481                 isFull = false
482         }
483
484         // If the volume is full, timestamp it.
485         if isFull {
486                 now := fmt.Sprintf("%d", time.Now().Unix())
487                 os.Symlink(now, fullSymlink)
488         }
489         return
490 }
491
492 // FreeDiskSpace returns the number of unused 1k blocks available on
493 // the volume.
494 //
495 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
496         var fs syscall.Statfs_t
497         err = syscall.Statfs(v.root, &fs)
498         if err == nil {
499                 // Statfs output is not guaranteed to measure free
500                 // space in terms of 1K blocks.
501                 free = fs.Bavail * uint64(fs.Bsize) / 1024
502         }
503         return
504 }
505
506 func (v *UnixVolume) String() string {
507         return fmt.Sprintf("[UnixVolume %s]", v.root)
508 }
509
510 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
511 func (v *UnixVolume) Writable() bool {
512         return !v.readonly
513 }
514
515 func (v *UnixVolume) Replication() int {
516         return 1
517 }
518
519 // lockfile and unlockfile use flock(2) to manage kernel file locks.
520 func lockfile(f *os.File) error {
521         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
522 }
523
524 func unlockfile(f *os.File) error {
525         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
526 }
527
528 // Where appropriate, translate a more specific filesystem error to an
529 // error recognized by handlers, like os.ErrNotExist.
530 func (v *UnixVolume) translateError(err error) error {
531         switch err.(type) {
532         case *os.PathError:
533                 // stat() returns a PathError if the parent directory
534                 // (not just the file itself) is missing
535                 return os.ErrNotExist
536         default:
537                 return err
538         }
539 }
540
541 var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
542
543 // EmptyTrash walks hierarchy looking for {hash}.trash.*
544 // and deletes those with deadline < now.
545 func (v *UnixVolume) EmptyTrash() {
546         var bytesDeleted, bytesInTrash int64
547         var blocksDeleted, blocksInTrash int
548
549         err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
550                 if err != nil {
551                         log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
552                         return nil
553                 }
554                 if info.Mode().IsDir() {
555                         return nil
556                 }
557                 matches := trashLocRegexp.FindStringSubmatch(path)
558                 if len(matches) != 3 {
559                         return nil
560                 }
561                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
562                 if err != nil {
563                         log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
564                         return nil
565                 }
566                 bytesInTrash += info.Size()
567                 blocksInTrash++
568                 if deadline > time.Now().Unix() {
569                         return nil
570                 }
571                 err = os.Remove(path)
572                 if err != nil {
573                         log.Printf("EmptyTrash: Remove %v: %v", path, err)
574                         return nil
575                 }
576                 bytesDeleted += info.Size()
577                 blocksDeleted++
578                 return nil
579         })
580
581         if err != nil {
582                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
583         }
584
585         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
586 }