8555: Implement trash with race-recovery for S3 volumes.
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if dirs := strings.Split(value, ","); len(dirs) > 1 {
27                 log.Print("DEPRECATED: using comma-separated volume list.")
28                 for _, dir := range dirs {
29                         if err := vs.Set(dir); err != nil {
30                                 return err
31                         }
32                 }
33                 return nil
34         }
35         if len(value) == 0 || value[0] != '/' {
36                 return errors.New("Invalid volume: must begin with '/'.")
37         }
38         if _, err := os.Stat(value); err != nil {
39                 return err
40         }
41         var locker sync.Locker
42         if flagSerializeIO {
43                 locker = &sync.Mutex{}
44         }
45         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
46                 root:     value,
47                 locker:   locker,
48                 readonly: flagReadonly,
49         })
50         return nil
51 }
52
53 func init() {
54         flag.Var(
55                 &unixVolumeAdder{&volumes},
56                 "volumes",
57                 "Deprecated synonym for -volume.")
58         flag.Var(
59                 &unixVolumeAdder{&volumes},
60                 "volume",
61                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
62 }
63
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
68         added := 0
69         f, err := os.Open(ProcMounts)
70         if err != nil {
71                 log.Fatalf("opening %s: %s", ProcMounts, err)
72         }
73         scanner := bufio.NewScanner(f)
74         for scanner.Scan() {
75                 args := strings.Fields(scanner.Text())
76                 if err := scanner.Err(); err != nil {
77                         log.Fatalf("reading %s: %s", ProcMounts, err)
78                 }
79                 dev, mount := args[0], args[1]
80                 if mount == "/" {
81                         continue
82                 }
83                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
84                         continue
85                 }
86                 keepdir := mount + "/keep"
87                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
88                         continue
89                 }
90                 // Set the -readonly flag (but only for this volume)
91                 // if the filesystem is mounted readonly.
92                 flagReadonlyWas := flagReadonly
93                 for _, fsopt := range strings.Split(args[3], ",") {
94                         if fsopt == "ro" {
95                                 flagReadonly = true
96                                 break
97                         }
98                         if fsopt == "rw" {
99                                 break
100                         }
101                 }
102                 if err := vs.Set(keepdir); err != nil {
103                         log.Printf("adding %q: %s", keepdir, err)
104                 } else {
105                         added++
106                 }
107                 flagReadonly = flagReadonlyWas
108         }
109         return added
110 }
111
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114         // path to the volume's root directory
115         root string
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker   sync.Locker
119         readonly bool
120 }
121
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
124         if v.readonly {
125                 return MethodDisabledError
126         }
127         p := v.blockPath(loc)
128         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
129         if err != nil {
130                 return err
131         }
132         defer f.Close()
133         if v.locker != nil {
134                 v.locker.Lock()
135                 defer v.locker.Unlock()
136         }
137         if e := lockfile(f); e != nil {
138                 return e
139         }
140         defer unlockfile(f)
141         ts := syscall.NsecToTimespec(time.Now().UnixNano())
142         return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
143 }
144
145 // Mtime returns the stored timestamp for the given locator.
146 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
147         p := v.blockPath(loc)
148         fi, err := os.Stat(p)
149         if err != nil {
150                 return time.Time{}, err
151         }
152         return fi.ModTime(), nil
153 }
154
155 // Lock the locker (if one is in use), open the file for reading, and
156 // call the given function if and when the file is ready to read.
157 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
158         if v.locker != nil {
159                 v.locker.Lock()
160                 defer v.locker.Unlock()
161         }
162         f, err := os.Open(path)
163         if err != nil {
164                 return err
165         }
166         defer f.Close()
167         return fn(f)
168 }
169
170 // stat is os.Stat() with some extra sanity checks.
171 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
172         stat, err := os.Stat(path)
173         if err == nil {
174                 if stat.Size() < 0 {
175                         err = os.ErrInvalid
176                 } else if stat.Size() > BlockSize {
177                         err = TooLongError
178                 }
179         }
180         return stat, err
181 }
182
183 // Get retrieves a block, copies it to the given slice, and returns
184 // the number of bytes copied.
185 func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
186         path := v.blockPath(loc)
187         stat, err := v.stat(path)
188         if err != nil {
189                 return 0, v.translateError(err)
190         }
191         if stat.Size() > int64(len(buf)) {
192                 return 0, TooLongError
193         }
194         var read int
195         size := int(stat.Size())
196         err = v.getFunc(path, func(rdr io.Reader) error {
197                 read, err = io.ReadFull(rdr, buf[:size])
198                 return err
199         })
200         return read, err
201 }
202
203 // Compare returns nil if Get(loc) would return the same content as
204 // expect. It is functionally equivalent to Get() followed by
205 // bytes.Compare(), but uses less memory.
206 func (v *UnixVolume) Compare(loc string, expect []byte) error {
207         path := v.blockPath(loc)
208         if _, err := v.stat(path); err != nil {
209                 return v.translateError(err)
210         }
211         return v.getFunc(path, func(rdr io.Reader) error {
212                 return compareReaderWithBuf(rdr, expect, loc[:32])
213         })
214 }
215
216 // Put stores a block of data identified by the locator string
217 // "loc".  It returns nil on success.  If the volume is full, it
218 // returns a FullError.  If the write fails due to some other error,
219 // that error is returned.
220 func (v *UnixVolume) Put(loc string, block []byte) error {
221         if v.readonly {
222                 return MethodDisabledError
223         }
224         if v.IsFull() {
225                 return FullError
226         }
227         bdir := v.blockDir(loc)
228         if err := os.MkdirAll(bdir, 0755); err != nil {
229                 log.Printf("%s: could not create directory %s: %s",
230                         loc, bdir, err)
231                 return err
232         }
233
234         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
235         if tmperr != nil {
236                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
237                 return tmperr
238         }
239         bpath := v.blockPath(loc)
240
241         if v.locker != nil {
242                 v.locker.Lock()
243                 defer v.locker.Unlock()
244         }
245         if _, err := tmpfile.Write(block); err != nil {
246                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
247                 tmpfile.Close()
248                 os.Remove(tmpfile.Name())
249                 return err
250         }
251         if err := tmpfile.Close(); err != nil {
252                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
253                 os.Remove(tmpfile.Name())
254                 return err
255         }
256         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
257                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
258                 os.Remove(tmpfile.Name())
259                 return err
260         }
261         return nil
262 }
263
264 // Status returns a VolumeStatus struct describing the volume's
265 // current state, or nil if an error occurs.
266 //
267 func (v *UnixVolume) Status() *VolumeStatus {
268         var fs syscall.Statfs_t
269         var devnum uint64
270
271         if fi, err := os.Stat(v.root); err == nil {
272                 devnum = fi.Sys().(*syscall.Stat_t).Dev
273         } else {
274                 log.Printf("%s: os.Stat: %s\n", v, err)
275                 return nil
276         }
277
278         err := syscall.Statfs(v.root, &fs)
279         if err != nil {
280                 log.Printf("%s: statfs: %s\n", v, err)
281                 return nil
282         }
283         // These calculations match the way df calculates disk usage:
284         // "free" space is measured by fs.Bavail, but "used" space
285         // uses fs.Blocks - fs.Bfree.
286         free := fs.Bavail * uint64(fs.Bsize)
287         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
288         return &VolumeStatus{v.root, devnum, free, used}
289 }
290
291 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
292 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
293
294 // IndexTo writes (to the given Writer) a list of blocks found on this
295 // volume which begin with the specified prefix. If the prefix is an
296 // empty string, IndexTo writes a complete list of blocks.
297 //
298 // Each block is given in the format
299 //
300 //     locator+size modification-time {newline}
301 //
302 // e.g.:
303 //
304 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
305 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
306 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
307 //
308 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
309         var lastErr error = nil
310         rootdir, err := os.Open(v.root)
311         if err != nil {
312                 return err
313         }
314         defer rootdir.Close()
315         for {
316                 names, err := rootdir.Readdirnames(1)
317                 if err == io.EOF {
318                         return lastErr
319                 } else if err != nil {
320                         return err
321                 }
322                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
323                         // prefix excludes all blocks stored in this dir
324                         continue
325                 }
326                 if !blockDirRe.MatchString(names[0]) {
327                         continue
328                 }
329                 blockdirpath := filepath.Join(v.root, names[0])
330                 blockdir, err := os.Open(blockdirpath)
331                 if err != nil {
332                         log.Print("Error reading ", blockdirpath, ": ", err)
333                         lastErr = err
334                         continue
335                 }
336                 for {
337                         fileInfo, err := blockdir.Readdir(1)
338                         if err == io.EOF {
339                                 break
340                         } else if err != nil {
341                                 log.Print("Error reading ", blockdirpath, ": ", err)
342                                 lastErr = err
343                                 break
344                         }
345                         name := fileInfo[0].Name()
346                         if !strings.HasPrefix(name, prefix) {
347                                 continue
348                         }
349                         if !blockFileRe.MatchString(name) {
350                                 continue
351                         }
352                         _, err = fmt.Fprint(w,
353                                 name,
354                                 "+", fileInfo[0].Size(),
355                                 " ", fileInfo[0].ModTime().UnixNano(),
356                                 "\n")
357                 }
358                 blockdir.Close()
359         }
360 }
361
362 // Trash trashes the block data from the unix storage
363 // If trashLifetime == 0, the block is deleted
364 // Else, the block is renamed as path/{loc}.trash.{deadline},
365 // where deadline = now + trashLifetime
366 func (v *UnixVolume) Trash(loc string) error {
367         // Touch() must be called before calling Write() on a block.  Touch()
368         // also uses lockfile().  This avoids a race condition between Write()
369         // and Trash() because either (a) the file will be trashed and Touch()
370         // will signal to the caller that the file is not present (and needs to
371         // be re-written), or (b) Touch() will update the file's timestamp and
372         // Trash() will read the correct up-to-date timestamp and choose not to
373         // trash the file.
374
375         if v.readonly {
376                 return MethodDisabledError
377         }
378         if v.locker != nil {
379                 v.locker.Lock()
380                 defer v.locker.Unlock()
381         }
382         p := v.blockPath(loc)
383         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
384         if err != nil {
385                 return err
386         }
387         defer f.Close()
388         if e := lockfile(f); e != nil {
389                 return e
390         }
391         defer unlockfile(f)
392
393         // If the block has been PUT in the last blobSignatureTTL
394         // seconds, return success without removing the block. This
395         // protects data from garbage collection until it is no longer
396         // possible for clients to retrieve the unreferenced blocks
397         // anyway (because the permission signatures have expired).
398         if fi, err := os.Stat(p); err != nil {
399                 return err
400         } else {
401                 if time.Since(fi.ModTime()) < blobSignatureTTL {
402                         return nil
403                 }
404         }
405
406         if trashLifetime == 0 {
407                 return os.Remove(p)
408         }
409         return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
410 }
411
412 // Untrash moves block from trash back into store
413 // Look for path/{loc}.trash.{deadline} in storage,
414 // and rename the first such file as path/{loc}
415 func (v *UnixVolume) Untrash(loc string) (err error) {
416         if v.readonly {
417                 return MethodDisabledError
418         }
419
420         files, err := ioutil.ReadDir(v.blockDir(loc))
421         if err != nil {
422                 return err
423         }
424
425         if len(files) == 0 {
426                 return os.ErrNotExist
427         }
428
429         foundTrash := false
430         prefix := fmt.Sprintf("%v.trash.", loc)
431         for _, f := range files {
432                 if strings.HasPrefix(f.Name(), prefix) {
433                         foundTrash = true
434                         err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
435                         if err == nil {
436                                 break
437                         }
438                 }
439         }
440
441         if foundTrash == false {
442                 return os.ErrNotExist
443         }
444
445         return
446 }
447
448 // blockDir returns the fully qualified directory name for the directory
449 // where loc is (or would be) stored on this volume.
450 func (v *UnixVolume) blockDir(loc string) string {
451         return filepath.Join(v.root, loc[0:3])
452 }
453
454 // blockPath returns the fully qualified pathname for the path to loc
455 // on this volume.
456 func (v *UnixVolume) blockPath(loc string) string {
457         return filepath.Join(v.blockDir(loc), loc)
458 }
459
460 // IsFull returns true if the free space on the volume is less than
461 // MinFreeKilobytes.
462 //
463 func (v *UnixVolume) IsFull() (isFull bool) {
464         fullSymlink := v.root + "/full"
465
466         // Check if the volume has been marked as full in the last hour.
467         if link, err := os.Readlink(fullSymlink); err == nil {
468                 if ts, err := strconv.Atoi(link); err == nil {
469                         fulltime := time.Unix(int64(ts), 0)
470                         if time.Since(fulltime).Hours() < 1.0 {
471                                 return true
472                         }
473                 }
474         }
475
476         if avail, err := v.FreeDiskSpace(); err == nil {
477                 isFull = avail < MinFreeKilobytes
478         } else {
479                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
480                 isFull = false
481         }
482
483         // If the volume is full, timestamp it.
484         if isFull {
485                 now := fmt.Sprintf("%d", time.Now().Unix())
486                 os.Symlink(now, fullSymlink)
487         }
488         return
489 }
490
491 // FreeDiskSpace returns the number of unused 1k blocks available on
492 // the volume.
493 //
494 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
495         var fs syscall.Statfs_t
496         err = syscall.Statfs(v.root, &fs)
497         if err == nil {
498                 // Statfs output is not guaranteed to measure free
499                 // space in terms of 1K blocks.
500                 free = fs.Bavail * uint64(fs.Bsize) / 1024
501         }
502         return
503 }
504
505 func (v *UnixVolume) String() string {
506         return fmt.Sprintf("[UnixVolume %s]", v.root)
507 }
508
509 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
510 func (v *UnixVolume) Writable() bool {
511         return !v.readonly
512 }
513
514 func (v *UnixVolume) Replication() int {
515         return 1
516 }
517
518 // lockfile and unlockfile use flock(2) to manage kernel file locks.
519 func lockfile(f *os.File) error {
520         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
521 }
522
523 func unlockfile(f *os.File) error {
524         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
525 }
526
527 // Where appropriate, translate a more specific filesystem error to an
528 // error recognized by handlers, like os.ErrNotExist.
529 func (v *UnixVolume) translateError(err error) error {
530         switch err.(type) {
531         case *os.PathError:
532                 // stat() returns a PathError if the parent directory
533                 // (not just the file itself) is missing
534                 return os.ErrNotExist
535         default:
536                 return err
537         }
538 }
539
540 var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
541
542 // EmptyTrash walks hierarchy looking for {hash}.trash.*
543 // and deletes those with deadline < now.
544 func (v *UnixVolume) EmptyTrash() {
545         var bytesDeleted, bytesInTrash int64
546         var blocksDeleted, blocksInTrash int
547
548         err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
549                 if err != nil {
550                         log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
551                         return nil
552                 }
553                 if info.Mode().IsDir() {
554                         return nil
555                 }
556                 matches := unixTrashLocRegexp.FindStringSubmatch(path)
557                 if len(matches) != 3 {
558                         return nil
559                 }
560                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
561                 if err != nil {
562                         log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
563                         return nil
564                 }
565                 bytesInTrash += info.Size()
566                 blocksInTrash++
567                 if deadline > time.Now().Unix() {
568                         return nil
569                 }
570                 err = os.Remove(path)
571                 if err != nil {
572                         log.Printf("EmptyTrash: Remove %v: %v", path, err)
573                         return nil
574                 }
575                 bytesDeleted += info.Size()
576                 blocksDeleted++
577                 return nil
578         })
579
580         if err != nil {
581                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
582         }
583
584         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
585 }