9701: Use a collection.OrderedDict instead of a simple dict to hold bufferblocks...
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if dirs := strings.Split(value, ","); len(dirs) > 1 {
27                 log.Print("DEPRECATED: using comma-separated volume list.")
28                 for _, dir := range dirs {
29                         if err := vs.Set(dir); err != nil {
30                                 return err
31                         }
32                 }
33                 return nil
34         }
35         if len(value) == 0 || value[0] != '/' {
36                 return errors.New("Invalid volume: must begin with '/'.")
37         }
38         if _, err := os.Stat(value); err != nil {
39                 return err
40         }
41         var locker sync.Locker
42         if flagSerializeIO {
43                 locker = &sync.Mutex{}
44         }
45         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
46                 root:     value,
47                 locker:   locker,
48                 readonly: flagReadonly,
49         })
50         return nil
51 }
52
53 func init() {
54         flag.Var(
55                 &unixVolumeAdder{&volumes},
56                 "volumes",
57                 "Deprecated synonym for -volume.")
58         flag.Var(
59                 &unixVolumeAdder{&volumes},
60                 "volume",
61                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
62 }
63
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
68         added := 0
69         f, err := os.Open(ProcMounts)
70         if err != nil {
71                 log.Fatalf("opening %s: %s", ProcMounts, err)
72         }
73         scanner := bufio.NewScanner(f)
74         for scanner.Scan() {
75                 args := strings.Fields(scanner.Text())
76                 if err := scanner.Err(); err != nil {
77                         log.Fatalf("reading %s: %s", ProcMounts, err)
78                 }
79                 dev, mount := args[0], args[1]
80                 if mount == "/" {
81                         continue
82                 }
83                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
84                         continue
85                 }
86                 keepdir := mount + "/keep"
87                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
88                         continue
89                 }
90                 // Set the -readonly flag (but only for this volume)
91                 // if the filesystem is mounted readonly.
92                 flagReadonlyWas := flagReadonly
93                 for _, fsopt := range strings.Split(args[3], ",") {
94                         if fsopt == "ro" {
95                                 flagReadonly = true
96                                 break
97                         }
98                         if fsopt == "rw" {
99                                 break
100                         }
101                 }
102                 if err := vs.Set(keepdir); err != nil {
103                         log.Printf("adding %q: %s", keepdir, err)
104                 } else {
105                         added++
106                 }
107                 flagReadonly = flagReadonlyWas
108         }
109         return added
110 }
111
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114         // path to the volume's root directory
115         root string
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker   sync.Locker
119         readonly bool
120 }
121
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
124         if v.readonly {
125                 return MethodDisabledError
126         }
127         p := v.blockPath(loc)
128         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
129         if err != nil {
130                 return err
131         }
132         defer f.Close()
133         if v.locker != nil {
134                 v.locker.Lock()
135                 defer v.locker.Unlock()
136         }
137         if e := lockfile(f); e != nil {
138                 return e
139         }
140         defer unlockfile(f)
141         ts := syscall.NsecToTimespec(time.Now().UnixNano())
142         return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
143 }
144
145 // Mtime returns the stored timestamp for the given locator.
146 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
147         p := v.blockPath(loc)
148         fi, err := os.Stat(p)
149         if err != nil {
150                 return time.Time{}, err
151         }
152         return fi.ModTime(), nil
153 }
154
155 // Lock the locker (if one is in use), open the file for reading, and
156 // call the given function if and when the file is ready to read.
157 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
158         if v.locker != nil {
159                 v.locker.Lock()
160                 defer v.locker.Unlock()
161         }
162         f, err := os.Open(path)
163         if err != nil {
164                 return err
165         }
166         defer f.Close()
167         return fn(f)
168 }
169
170 // stat is os.Stat() with some extra sanity checks.
171 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
172         stat, err := os.Stat(path)
173         if err == nil {
174                 if stat.Size() < 0 {
175                         err = os.ErrInvalid
176                 } else if stat.Size() > BlockSize {
177                         err = TooLongError
178                 }
179         }
180         return stat, err
181 }
182
183 // Get retrieves a block, copies it to the given slice, and returns
184 // the number of bytes copied.
185 func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
186         path := v.blockPath(loc)
187         stat, err := v.stat(path)
188         if err != nil {
189                 return 0, v.translateError(err)
190         }
191         if stat.Size() > int64(len(buf)) {
192                 return 0, TooLongError
193         }
194         var read int
195         size := int(stat.Size())
196         err = v.getFunc(path, func(rdr io.Reader) error {
197                 read, err = io.ReadFull(rdr, buf[:size])
198                 return err
199         })
200         return read, err
201 }
202
203 // Compare returns nil if Get(loc) would return the same content as
204 // expect. It is functionally equivalent to Get() followed by
205 // bytes.Compare(), but uses less memory.
206 func (v *UnixVolume) Compare(loc string, expect []byte) error {
207         path := v.blockPath(loc)
208         if _, err := v.stat(path); err != nil {
209                 return v.translateError(err)
210         }
211         return v.getFunc(path, func(rdr io.Reader) error {
212                 return compareReaderWithBuf(rdr, expect, loc[:32])
213         })
214 }
215
216 // Put stores a block of data identified by the locator string
217 // "loc".  It returns nil on success.  If the volume is full, it
218 // returns a FullError.  If the write fails due to some other error,
219 // that error is returned.
220 func (v *UnixVolume) Put(loc string, block []byte) error {
221         if v.readonly {
222                 return MethodDisabledError
223         }
224         if v.IsFull() {
225                 return FullError
226         }
227         bdir := v.blockDir(loc)
228         if err := os.MkdirAll(bdir, 0755); err != nil {
229                 log.Printf("%s: could not create directory %s: %s",
230                         loc, bdir, err)
231                 return err
232         }
233
234         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
235         if tmperr != nil {
236                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
237                 return tmperr
238         }
239         bpath := v.blockPath(loc)
240
241         if v.locker != nil {
242                 v.locker.Lock()
243                 defer v.locker.Unlock()
244         }
245         if _, err := tmpfile.Write(block); err != nil {
246                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
247                 tmpfile.Close()
248                 os.Remove(tmpfile.Name())
249                 return err
250         }
251         if err := tmpfile.Close(); err != nil {
252                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
253                 os.Remove(tmpfile.Name())
254                 return err
255         }
256         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
257                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
258                 os.Remove(tmpfile.Name())
259                 return err
260         }
261         return nil
262 }
263
264 // Status returns a VolumeStatus struct describing the volume's
265 // current state, or nil if an error occurs.
266 //
267 func (v *UnixVolume) Status() *VolumeStatus {
268         var fs syscall.Statfs_t
269         var devnum uint64
270
271         if fi, err := os.Stat(v.root); err == nil {
272                 devnum = fi.Sys().(*syscall.Stat_t).Dev
273         } else {
274                 log.Printf("%s: os.Stat: %s\n", v, err)
275                 return nil
276         }
277
278         err := syscall.Statfs(v.root, &fs)
279         if err != nil {
280                 log.Printf("%s: statfs: %s\n", v, err)
281                 return nil
282         }
283         // These calculations match the way df calculates disk usage:
284         // "free" space is measured by fs.Bavail, but "used" space
285         // uses fs.Blocks - fs.Bfree.
286         free := fs.Bavail * uint64(fs.Bsize)
287         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
288         return &VolumeStatus{v.root, devnum, free, used}
289 }
290
291 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
292 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
293
294 // IndexTo writes (to the given Writer) a list of blocks found on this
295 // volume which begin with the specified prefix. If the prefix is an
296 // empty string, IndexTo writes a complete list of blocks.
297 //
298 // Each block is given in the format
299 //
300 //     locator+size modification-time {newline}
301 //
302 // e.g.:
303 //
304 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
305 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
306 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
307 //
308 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
309         var lastErr error
310         rootdir, err := os.Open(v.root)
311         if err != nil {
312                 return err
313         }
314         defer rootdir.Close()
315         for {
316                 names, err := rootdir.Readdirnames(1)
317                 if err == io.EOF {
318                         return lastErr
319                 } else if err != nil {
320                         return err
321                 }
322                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
323                         // prefix excludes all blocks stored in this dir
324                         continue
325                 }
326                 if !blockDirRe.MatchString(names[0]) {
327                         continue
328                 }
329                 blockdirpath := filepath.Join(v.root, names[0])
330                 blockdir, err := os.Open(blockdirpath)
331                 if err != nil {
332                         log.Print("Error reading ", blockdirpath, ": ", err)
333                         lastErr = err
334                         continue
335                 }
336                 for {
337                         fileInfo, err := blockdir.Readdir(1)
338                         if err == io.EOF {
339                                 break
340                         } else if err != nil {
341                                 log.Print("Error reading ", blockdirpath, ": ", err)
342                                 lastErr = err
343                                 break
344                         }
345                         name := fileInfo[0].Name()
346                         if !strings.HasPrefix(name, prefix) {
347                                 continue
348                         }
349                         if !blockFileRe.MatchString(name) {
350                                 continue
351                         }
352                         _, err = fmt.Fprint(w,
353                                 name,
354                                 "+", fileInfo[0].Size(),
355                                 " ", fileInfo[0].ModTime().UnixNano(),
356                                 "\n")
357                 }
358                 blockdir.Close()
359         }
360 }
361
362 // Trash trashes the block data from the unix storage
363 // If trashLifetime == 0, the block is deleted
364 // Else, the block is renamed as path/{loc}.trash.{deadline},
365 // where deadline = now + trashLifetime
366 func (v *UnixVolume) Trash(loc string) error {
367         // Touch() must be called before calling Write() on a block.  Touch()
368         // also uses lockfile().  This avoids a race condition between Write()
369         // and Trash() because either (a) the file will be trashed and Touch()
370         // will signal to the caller that the file is not present (and needs to
371         // be re-written), or (b) Touch() will update the file's timestamp and
372         // Trash() will read the correct up-to-date timestamp and choose not to
373         // trash the file.
374
375         if v.readonly {
376                 return MethodDisabledError
377         }
378         if v.locker != nil {
379                 v.locker.Lock()
380                 defer v.locker.Unlock()
381         }
382         p := v.blockPath(loc)
383         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
384         if err != nil {
385                 return err
386         }
387         defer f.Close()
388         if e := lockfile(f); e != nil {
389                 return e
390         }
391         defer unlockfile(f)
392
393         // If the block has been PUT in the last blobSignatureTTL
394         // seconds, return success without removing the block. This
395         // protects data from garbage collection until it is no longer
396         // possible for clients to retrieve the unreferenced blocks
397         // anyway (because the permission signatures have expired).
398         if fi, err := os.Stat(p); err != nil {
399                 return err
400         } else if time.Since(fi.ModTime()) < blobSignatureTTL {
401                 return nil
402         }
403
404         if trashLifetime == 0 {
405                 return os.Remove(p)
406         }
407         return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
408 }
409
410 // Untrash moves block from trash back into store
411 // Look for path/{loc}.trash.{deadline} in storage,
412 // and rename the first such file as path/{loc}
413 func (v *UnixVolume) Untrash(loc string) (err error) {
414         if v.readonly {
415                 return MethodDisabledError
416         }
417
418         files, err := ioutil.ReadDir(v.blockDir(loc))
419         if err != nil {
420                 return err
421         }
422
423         if len(files) == 0 {
424                 return os.ErrNotExist
425         }
426
427         foundTrash := false
428         prefix := fmt.Sprintf("%v.trash.", loc)
429         for _, f := range files {
430                 if strings.HasPrefix(f.Name(), prefix) {
431                         foundTrash = true
432                         err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
433                         if err == nil {
434                                 break
435                         }
436                 }
437         }
438
439         if foundTrash == false {
440                 return os.ErrNotExist
441         }
442
443         return
444 }
445
446 // blockDir returns the fully qualified directory name for the directory
447 // where loc is (or would be) stored on this volume.
448 func (v *UnixVolume) blockDir(loc string) string {
449         return filepath.Join(v.root, loc[0:3])
450 }
451
452 // blockPath returns the fully qualified pathname for the path to loc
453 // on this volume.
454 func (v *UnixVolume) blockPath(loc string) string {
455         return filepath.Join(v.blockDir(loc), loc)
456 }
457
458 // IsFull returns true if the free space on the volume is less than
459 // MinFreeKilobytes.
460 //
461 func (v *UnixVolume) IsFull() (isFull bool) {
462         fullSymlink := v.root + "/full"
463
464         // Check if the volume has been marked as full in the last hour.
465         if link, err := os.Readlink(fullSymlink); err == nil {
466                 if ts, err := strconv.Atoi(link); err == nil {
467                         fulltime := time.Unix(int64(ts), 0)
468                         if time.Since(fulltime).Hours() < 1.0 {
469                                 return true
470                         }
471                 }
472         }
473
474         if avail, err := v.FreeDiskSpace(); err == nil {
475                 isFull = avail < MinFreeKilobytes
476         } else {
477                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
478                 isFull = false
479         }
480
481         // If the volume is full, timestamp it.
482         if isFull {
483                 now := fmt.Sprintf("%d", time.Now().Unix())
484                 os.Symlink(now, fullSymlink)
485         }
486         return
487 }
488
489 // FreeDiskSpace returns the number of unused 1k blocks available on
490 // the volume.
491 //
492 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
493         var fs syscall.Statfs_t
494         err = syscall.Statfs(v.root, &fs)
495         if err == nil {
496                 // Statfs output is not guaranteed to measure free
497                 // space in terms of 1K blocks.
498                 free = fs.Bavail * uint64(fs.Bsize) / 1024
499         }
500         return
501 }
502
503 func (v *UnixVolume) String() string {
504         return fmt.Sprintf("[UnixVolume %s]", v.root)
505 }
506
507 // Writable returns false if all future Put, Mtime, and Delete calls
508 // are expected to fail.
509 func (v *UnixVolume) Writable() bool {
510         return !v.readonly
511 }
512
513 // Replication returns the number of replicas promised by the
514 // underlying device (currently assumed to be 1).
515 func (v *UnixVolume) Replication() int {
516         return 1
517 }
518
519 // lockfile and unlockfile use flock(2) to manage kernel file locks.
520 func lockfile(f *os.File) error {
521         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
522 }
523
524 func unlockfile(f *os.File) error {
525         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
526 }
527
528 // Where appropriate, translate a more specific filesystem error to an
529 // error recognized by handlers, like os.ErrNotExist.
530 func (v *UnixVolume) translateError(err error) error {
531         switch err.(type) {
532         case *os.PathError:
533                 // stat() returns a PathError if the parent directory
534                 // (not just the file itself) is missing
535                 return os.ErrNotExist
536         default:
537                 return err
538         }
539 }
540
541 var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
542
543 // EmptyTrash walks hierarchy looking for {hash}.trash.*
544 // and deletes those with deadline < now.
545 func (v *UnixVolume) EmptyTrash() {
546         var bytesDeleted, bytesInTrash int64
547         var blocksDeleted, blocksInTrash int
548
549         err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
550                 if err != nil {
551                         log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
552                         return nil
553                 }
554                 if info.Mode().IsDir() {
555                         return nil
556                 }
557                 matches := unixTrashLocRegexp.FindStringSubmatch(path)
558                 if len(matches) != 3 {
559                         return nil
560                 }
561                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
562                 if err != nil {
563                         log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
564                         return nil
565                 }
566                 bytesInTrash += info.Size()
567                 blocksInTrash++
568                 if deadline > time.Now().Unix() {
569                         return nil
570                 }
571                 err = os.Remove(path)
572                 if err != nil {
573                         log.Printf("EmptyTrash: Remove %v: %v", path, err)
574                         return nil
575                 }
576                 bytesDeleted += info.Size()
577                 blocksDeleted++
578                 return nil
579         })
580
581         if err != nil {
582                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
583         }
584
585         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
586 }