3f1df64c4c672940392ad0a6470d8fd5c3e8a131
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if dirs := strings.Split(value, ","); len(dirs) > 1 {
27                 log.Print("DEPRECATED: using comma-separated volume list.")
28                 for _, dir := range dirs {
29                         if err := vs.Set(dir); err != nil {
30                                 return err
31                         }
32                 }
33                 return nil
34         }
35         if len(value) == 0 || value[0] != '/' {
36                 return errors.New("Invalid volume: must begin with '/'.")
37         }
38         if _, err := os.Stat(value); err != nil {
39                 return err
40         }
41         var locker sync.Locker
42         if flagSerializeIO {
43                 locker = &sync.Mutex{}
44         }
45         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
46                 root:     value,
47                 locker:   locker,
48                 readonly: flagReadonly,
49         })
50         return nil
51 }
52
53 func init() {
54         flag.Var(
55                 &unixVolumeAdder{&volumes},
56                 "volumes",
57                 "Deprecated synonym for -volume.")
58         flag.Var(
59                 &unixVolumeAdder{&volumes},
60                 "volume",
61                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
62 }
63
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
68         added := 0
69         f, err := os.Open(ProcMounts)
70         if err != nil {
71                 log.Fatalf("opening %s: %s", ProcMounts, err)
72         }
73         scanner := bufio.NewScanner(f)
74         for scanner.Scan() {
75                 args := strings.Fields(scanner.Text())
76                 if err := scanner.Err(); err != nil {
77                         log.Fatalf("reading %s: %s", ProcMounts, err)
78                 }
79                 dev, mount := args[0], args[1]
80                 if mount == "/" {
81                         continue
82                 }
83                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
84                         continue
85                 }
86                 keepdir := mount + "/keep"
87                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
88                         continue
89                 }
90                 // Set the -readonly flag (but only for this volume)
91                 // if the filesystem is mounted readonly.
92                 flagReadonlyWas := flagReadonly
93                 for _, fsopt := range strings.Split(args[3], ",") {
94                         if fsopt == "ro" {
95                                 flagReadonly = true
96                                 break
97                         }
98                         if fsopt == "rw" {
99                                 break
100                         }
101                 }
102                 if err := vs.Set(keepdir); err != nil {
103                         log.Printf("adding %q: %s", keepdir, err)
104                 } else {
105                         added++
106                 }
107                 flagReadonly = flagReadonlyWas
108         }
109         return added
110 }
111
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114         // path to the volume's root directory
115         root string
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker   sync.Locker
119         readonly bool
120 }
121
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
124         if v.readonly {
125                 return MethodDisabledError
126         }
127         p := v.blockPath(loc)
128         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
129         if err != nil {
130                 return err
131         }
132         defer f.Close()
133         if v.locker != nil {
134                 v.locker.Lock()
135                 defer v.locker.Unlock()
136         }
137         if e := lockfile(f); e != nil {
138                 return e
139         }
140         defer unlockfile(f)
141         now := time.Now().Unix()
142         utime := syscall.Utimbuf{now, now}
143         return syscall.Utime(p, &utime)
144 }
145
146 // Mtime returns the stored timestamp for the given locator.
147 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
148         p := v.blockPath(loc)
149         fi, err := os.Stat(p)
150         if err != nil {
151                 return time.Time{}, err
152         }
153         return fi.ModTime(), nil
154 }
155
156 // Lock the locker (if one is in use), open the file for reading, and
157 // call the given function if and when the file is ready to read.
158 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
159         if v.locker != nil {
160                 v.locker.Lock()
161                 defer v.locker.Unlock()
162         }
163         f, err := os.Open(path)
164         if err != nil {
165                 return err
166         }
167         defer f.Close()
168         return fn(f)
169 }
170
171 // stat is os.Stat() with some extra sanity checks.
172 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
173         stat, err := os.Stat(path)
174         if err == nil {
175                 if stat.Size() < 0 {
176                         err = os.ErrInvalid
177                 } else if stat.Size() > BlockSize {
178                         err = TooLongError
179                 }
180         }
181         return stat, err
182 }
183
184 // Get retrieves a block identified by the locator string "loc", and
185 // returns its contents as a byte slice.
186 //
187 // Get returns a nil buffer IFF it returns a non-nil error.
188 func (v *UnixVolume) Get(loc string) ([]byte, error) {
189         path := v.blockPath(loc)
190         stat, err := v.stat(path)
191         if err != nil {
192                 return nil, v.translateError(err)
193         }
194         buf := bufs.Get(int(stat.Size()))
195         err = v.getFunc(path, func(rdr io.Reader) error {
196                 _, err = io.ReadFull(rdr, buf)
197                 return err
198         })
199         if err != nil {
200                 bufs.Put(buf)
201                 return nil, err
202         }
203         return buf, nil
204 }
205
206 // Compare returns nil if Get(loc) would return the same content as
207 // expect. It is functionally equivalent to Get() followed by
208 // bytes.Compare(), but uses less memory.
209 func (v *UnixVolume) Compare(loc string, expect []byte) error {
210         path := v.blockPath(loc)
211         if _, err := v.stat(path); err != nil {
212                 return v.translateError(err)
213         }
214         return v.getFunc(path, func(rdr io.Reader) error {
215                 return compareReaderWithBuf(rdr, expect, loc[:32])
216         })
217 }
218
219 // Put stores a block of data identified by the locator string
220 // "loc".  It returns nil on success.  If the volume is full, it
221 // returns a FullError.  If the write fails due to some other error,
222 // that error is returned.
223 func (v *UnixVolume) Put(loc string, block []byte) error {
224         if v.readonly {
225                 return MethodDisabledError
226         }
227         if v.IsFull() {
228                 return FullError
229         }
230         bdir := v.blockDir(loc)
231         if err := os.MkdirAll(bdir, 0755); err != nil {
232                 log.Printf("%s: could not create directory %s: %s",
233                         loc, bdir, err)
234                 return err
235         }
236
237         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
238         if tmperr != nil {
239                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
240                 return tmperr
241         }
242         bpath := v.blockPath(loc)
243
244         if v.locker != nil {
245                 v.locker.Lock()
246                 defer v.locker.Unlock()
247         }
248         if _, err := tmpfile.Write(block); err != nil {
249                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
250                 tmpfile.Close()
251                 os.Remove(tmpfile.Name())
252                 return err
253         }
254         if err := tmpfile.Close(); err != nil {
255                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
256                 os.Remove(tmpfile.Name())
257                 return err
258         }
259         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
260                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
261                 os.Remove(tmpfile.Name())
262                 return err
263         }
264         return nil
265 }
266
267 // Status returns a VolumeStatus struct describing the volume's
268 // current state, or nil if an error occurs.
269 //
270 func (v *UnixVolume) Status() *VolumeStatus {
271         var fs syscall.Statfs_t
272         var devnum uint64
273
274         if fi, err := os.Stat(v.root); err == nil {
275                 devnum = fi.Sys().(*syscall.Stat_t).Dev
276         } else {
277                 log.Printf("%s: os.Stat: %s\n", v, err)
278                 return nil
279         }
280
281         err := syscall.Statfs(v.root, &fs)
282         if err != nil {
283                 log.Printf("%s: statfs: %s\n", v, err)
284                 return nil
285         }
286         // These calculations match the way df calculates disk usage:
287         // "free" space is measured by fs.Bavail, but "used" space
288         // uses fs.Blocks - fs.Bfree.
289         free := fs.Bavail * uint64(fs.Bsize)
290         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
291         return &VolumeStatus{v.root, devnum, free, used}
292 }
293
294 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
295 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
296
297 // IndexTo writes (to the given Writer) a list of blocks found on this
298 // volume which begin with the specified prefix. If the prefix is an
299 // empty string, IndexTo writes a complete list of blocks.
300 //
301 // Each block is given in the format
302 //
303 //     locator+size modification-time {newline}
304 //
305 // e.g.:
306 //
307 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
308 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
309 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
310 //
311 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
312         var lastErr error = nil
313         rootdir, err := os.Open(v.root)
314         if err != nil {
315                 return err
316         }
317         defer rootdir.Close()
318         for {
319                 names, err := rootdir.Readdirnames(1)
320                 if err == io.EOF {
321                         return lastErr
322                 } else if err != nil {
323                         return err
324                 }
325                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
326                         // prefix excludes all blocks stored in this dir
327                         continue
328                 }
329                 if !blockDirRe.MatchString(names[0]) {
330                         continue
331                 }
332                 blockdirpath := filepath.Join(v.root, names[0])
333                 blockdir, err := os.Open(blockdirpath)
334                 if err != nil {
335                         log.Print("Error reading ", blockdirpath, ": ", err)
336                         lastErr = err
337                         continue
338                 }
339                 for {
340                         fileInfo, err := blockdir.Readdir(1)
341                         if err == io.EOF {
342                                 break
343                         } else if err != nil {
344                                 log.Print("Error reading ", blockdirpath, ": ", err)
345                                 lastErr = err
346                                 break
347                         }
348                         name := fileInfo[0].Name()
349                         if !strings.HasPrefix(name, prefix) {
350                                 continue
351                         }
352                         if !blockFileRe.MatchString(name) {
353                                 continue
354                         }
355                         _, err = fmt.Fprint(w,
356                                 name,
357                                 "+", fileInfo[0].Size(),
358                                 " ", fileInfo[0].ModTime().Unix(),
359                                 "\n")
360                 }
361                 blockdir.Close()
362         }
363 }
364
365 // Trash trashes the block data from the unix storage
366 // If trashLifetime == 0, the block is deleted
367 // Else, the block is renamed as path/{loc}.trash.{deadline},
368 // where deadline = now + trashLifetime
369 func (v *UnixVolume) Trash(loc string) error {
370         // Touch() must be called before calling Write() on a block.  Touch()
371         // also uses lockfile().  This avoids a race condition between Write()
372         // and Trash() because either (a) the file will be trashed and Touch()
373         // will signal to the caller that the file is not present (and needs to
374         // be re-written), or (b) Touch() will update the file's timestamp and
375         // Trash() will read the correct up-to-date timestamp and choose not to
376         // trash the file.
377
378         if v.readonly {
379                 return MethodDisabledError
380         }
381         if v.locker != nil {
382                 v.locker.Lock()
383                 defer v.locker.Unlock()
384         }
385         p := v.blockPath(loc)
386         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
387         if err != nil {
388                 return err
389         }
390         defer f.Close()
391         if e := lockfile(f); e != nil {
392                 return e
393         }
394         defer unlockfile(f)
395
396         // If the block has been PUT in the last blobSignatureTTL
397         // seconds, return success without removing the block. This
398         // protects data from garbage collection until it is no longer
399         // possible for clients to retrieve the unreferenced blocks
400         // anyway (because the permission signatures have expired).
401         if fi, err := os.Stat(p); err != nil {
402                 return err
403         } else {
404                 if time.Since(fi.ModTime()) < blobSignatureTTL {
405                         return nil
406                 }
407         }
408
409         if trashLifetime == 0 {
410                 return os.Remove(p)
411         }
412         return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
413 }
414
415 // Untrash moves block from trash back into store
416 // Look for path/{loc}.trash.{deadline} in storage,
417 // and rename the first such file as path/{loc}
418 func (v *UnixVolume) Untrash(loc string) (err error) {
419         if v.readonly {
420                 return MethodDisabledError
421         }
422
423         prefix := fmt.Sprintf("%v.trash.", loc)
424         files, err := ioutil.ReadDir(v.blockDir(loc))
425         if err != nil {
426                 return err
427         }
428         if len(files) == 0 {
429                 return os.ErrNotExist
430         }
431         for _, f := range files {
432                 if strings.HasPrefix(f.Name(), prefix) {
433                         err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
434                         if err == nil {
435                                 break
436                         }
437                 }
438         }
439
440         return
441 }
442
443 // blockDir returns the fully qualified directory name for the directory
444 // where loc is (or would be) stored on this volume.
445 func (v *UnixVolume) blockDir(loc string) string {
446         return filepath.Join(v.root, loc[0:3])
447 }
448
449 // blockPath returns the fully qualified pathname for the path to loc
450 // on this volume.
451 func (v *UnixVolume) blockPath(loc string) string {
452         return filepath.Join(v.blockDir(loc), loc)
453 }
454
455 // IsFull returns true if the free space on the volume is less than
456 // MinFreeKilobytes.
457 //
458 func (v *UnixVolume) IsFull() (isFull bool) {
459         fullSymlink := v.root + "/full"
460
461         // Check if the volume has been marked as full in the last hour.
462         if link, err := os.Readlink(fullSymlink); err == nil {
463                 if ts, err := strconv.Atoi(link); err == nil {
464                         fulltime := time.Unix(int64(ts), 0)
465                         if time.Since(fulltime).Hours() < 1.0 {
466                                 return true
467                         }
468                 }
469         }
470
471         if avail, err := v.FreeDiskSpace(); err == nil {
472                 isFull = avail < MinFreeKilobytes
473         } else {
474                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
475                 isFull = false
476         }
477
478         // If the volume is full, timestamp it.
479         if isFull {
480                 now := fmt.Sprintf("%d", time.Now().Unix())
481                 os.Symlink(now, fullSymlink)
482         }
483         return
484 }
485
486 // FreeDiskSpace returns the number of unused 1k blocks available on
487 // the volume.
488 //
489 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
490         var fs syscall.Statfs_t
491         err = syscall.Statfs(v.root, &fs)
492         if err == nil {
493                 // Statfs output is not guaranteed to measure free
494                 // space in terms of 1K blocks.
495                 free = fs.Bavail * uint64(fs.Bsize) / 1024
496         }
497         return
498 }
499
500 func (v *UnixVolume) String() string {
501         return fmt.Sprintf("[UnixVolume %s]", v.root)
502 }
503
504 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
505 func (v *UnixVolume) Writable() bool {
506         return !v.readonly
507 }
508
509 func (v *UnixVolume) Replication() int {
510         return 1
511 }
512
513 // lockfile and unlockfile use flock(2) to manage kernel file locks.
514 func lockfile(f *os.File) error {
515         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
516 }
517
518 func unlockfile(f *os.File) error {
519         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
520 }
521
522 // Where appropriate, translate a more specific filesystem error to an
523 // error recognized by handlers, like os.ErrNotExist.
524 func (v *UnixVolume) translateError(err error) error {
525         switch err.(type) {
526         case *os.PathError:
527                 // stat() returns a PathError if the parent directory
528                 // (not just the file itself) is missing
529                 return os.ErrNotExist
530         default:
531                 return err
532         }
533 }
534
535 // EmptyTrash walks hierarchy looking for {hash}.trash.*
536 // and deletes those with deadline < now.
537 func (v *UnixVolume) EmptyTrash() {
538         var bytesDeleted, bytesInTrash int64
539         var blocksDeleted, blocksInTrash int
540
541         err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
542                 if err != nil {
543                         return err
544                 }
545
546                 if !info.Mode().IsDir() {
547                         matches := trashLocRegexp.FindStringSubmatch(path)
548                         if len(matches) == 3 {
549                                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
550                                 if err != nil {
551                                         log.Printf("EmptyTrash error for %v: %v", matches[1], err)
552                                 } else {
553                                         if int64(deadline) <= time.Now().Unix() {
554                                                 err = os.Remove(path)
555                                                 if err != nil {
556                                                         log.Printf("Error deleting %v: %v", matches[1], err)
557                                                         bytesInTrash += info.Size()
558                                                         blocksInTrash++
559                                                 } else {
560                                                         bytesDeleted += info.Size()
561                                                         blocksDeleted++
562                                                 }
563                                         } else {
564                                                 bytesInTrash += info.Size()
565                                                 blocksInTrash++
566                                         }
567                                 }
568                         }
569                 }
570                 return nil
571         })
572
573         if err != nil {
574                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
575         }
576
577         log.Printf("EmptyTrash stats for %v: Bytes deleted %v; Blocks deleted %v; Bytes remaining in trash: %v; Blocks remaining in trash: %v", v.String(), bytesDeleted, blocksDeleted, bytesInTrash, blocksInTrash)
578 }