b5753dec04638927162a328d2a43f2fd4e567a50
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "flag"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "os"
11         "path/filepath"
12         "regexp"
13         "strconv"
14         "strings"
15         "sync"
16         "syscall"
17         "time"
18 )
19
20 type unixVolumeAdder struct {
21         *Config
22 }
23
24 // String implements flag.Value
25 func (s *unixVolumeAdder) String() string {
26         return "-"
27 }
28
29 func (vs *unixVolumeAdder) Set(path string) error {
30         if dirs := strings.Split(path, ","); len(dirs) > 1 {
31                 log.Print("DEPRECATED: using comma-separated volume list.")
32                 for _, dir := range dirs {
33                         if err := vs.Set(dir); err != nil {
34                                 return err
35                         }
36                 }
37                 return nil
38         }
39         vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
40                 Root:      path,
41                 ReadOnly:  deprecated.flagReadonly,
42                 Serialize: deprecated.flagSerializeIO,
43         })
44         return nil
45 }
46
47 func init() {
48         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
49
50         flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
51         flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
52 }
53
54 // Discover adds a UnixVolume for every directory named "keep" that is
55 // located at the top level of a device- or tmpfs-backed mount point
56 // other than "/". It returns the number of volumes added.
57 func (vs *unixVolumeAdder) Discover() int {
58         added := 0
59         f, err := os.Open(ProcMounts)
60         if err != nil {
61                 log.Fatalf("opening %s: %s", ProcMounts, err)
62         }
63         scanner := bufio.NewScanner(f)
64         for scanner.Scan() {
65                 args := strings.Fields(scanner.Text())
66                 if err := scanner.Err(); err != nil {
67                         log.Fatalf("reading %s: %s", ProcMounts, err)
68                 }
69                 dev, mount := args[0], args[1]
70                 if mount == "/" {
71                         continue
72                 }
73                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
74                         continue
75                 }
76                 keepdir := mount + "/keep"
77                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
78                         continue
79                 }
80                 // Set the -readonly flag (but only for this volume)
81                 // if the filesystem is mounted readonly.
82                 flagReadonlyWas := deprecated.flagReadonly
83                 for _, fsopt := range strings.Split(args[3], ",") {
84                         if fsopt == "ro" {
85                                 deprecated.flagReadonly = true
86                                 break
87                         }
88                         if fsopt == "rw" {
89                                 break
90                         }
91                 }
92                 if err := vs.Set(keepdir); err != nil {
93                         log.Printf("adding %q: %s", keepdir, err)
94                 } else {
95                         added++
96                 }
97                 deprecated.flagReadonly = flagReadonlyWas
98         }
99         return added
100 }
101
102 // A UnixVolume stores and retrieves blocks in a local directory.
103 type UnixVolume struct {
104         Root                 string // path to the volume's root directory
105         ReadOnly             bool
106         Serialize            bool
107         DirectoryReplication int
108
109         // something to lock during IO, typically a sync.Mutex (or nil
110         // to skip locking)
111         locker sync.Locker
112 }
113
114 // Examples implements VolumeWithExamples.
115 func (*UnixVolume) Examples() []Volume {
116         return []Volume{
117                 &UnixVolume{
118                         Root:                 "/mnt/local-disk",
119                         Serialize:            true,
120                         DirectoryReplication: 1,
121                 },
122                 &UnixVolume{
123                         Root:                 "/mnt/network-disk",
124                         Serialize:            false,
125                         DirectoryReplication: 2,
126                 },
127         }
128 }
129
130 // Type implements Volume
131 func (v *UnixVolume) Type() string {
132         return "Directory"
133 }
134
135 // Start implements Volume
136 func (v *UnixVolume) Start() error {
137         if v.Serialize {
138                 v.locker = &sync.Mutex{}
139         }
140         if !strings.HasPrefix(v.Root, "/") {
141                 return fmt.Errorf("volume root does not start with '/': %q", v.Root)
142         }
143         if v.DirectoryReplication == 0 {
144                 v.DirectoryReplication = 1
145         }
146         _, err := os.Stat(v.Root)
147         return err
148 }
149
150 // Touch sets the timestamp for the given locator to the current time
151 func (v *UnixVolume) Touch(loc string) error {
152         if v.ReadOnly {
153                 return MethodDisabledError
154         }
155         p := v.blockPath(loc)
156         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
157         if err != nil {
158                 return err
159         }
160         defer f.Close()
161         if v.locker != nil {
162                 v.locker.Lock()
163                 defer v.locker.Unlock()
164         }
165         if e := lockfile(f); e != nil {
166                 return e
167         }
168         defer unlockfile(f)
169         ts := syscall.NsecToTimespec(time.Now().UnixNano())
170         return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
171 }
172
173 // Mtime returns the stored timestamp for the given locator.
174 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
175         p := v.blockPath(loc)
176         fi, err := os.Stat(p)
177         if err != nil {
178                 return time.Time{}, err
179         }
180         return fi.ModTime(), nil
181 }
182
183 // Lock the locker (if one is in use), open the file for reading, and
184 // call the given function if and when the file is ready to read.
185 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
186         if v.locker != nil {
187                 v.locker.Lock()
188                 defer v.locker.Unlock()
189         }
190         f, err := os.Open(path)
191         if err != nil {
192                 return err
193         }
194         defer f.Close()
195         return fn(f)
196 }
197
198 // stat is os.Stat() with some extra sanity checks.
199 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
200         stat, err := os.Stat(path)
201         if err == nil {
202                 if stat.Size() < 0 {
203                         err = os.ErrInvalid
204                 } else if stat.Size() > BlockSize {
205                         err = TooLongError
206                 }
207         }
208         return stat, err
209 }
210
211 // Get retrieves a block, copies it to the given slice, and returns
212 // the number of bytes copied.
213 func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
214         path := v.blockPath(loc)
215         stat, err := v.stat(path)
216         if err != nil {
217                 return 0, v.translateError(err)
218         }
219         if stat.Size() > int64(len(buf)) {
220                 return 0, TooLongError
221         }
222         var read int
223         size := int(stat.Size())
224         err = v.getFunc(path, func(rdr io.Reader) error {
225                 read, err = io.ReadFull(rdr, buf[:size])
226                 return err
227         })
228         return read, err
229 }
230
231 // Compare returns nil if Get(loc) would return the same content as
232 // expect. It is functionally equivalent to Get() followed by
233 // bytes.Compare(), but uses less memory.
234 func (v *UnixVolume) Compare(loc string, expect []byte) error {
235         path := v.blockPath(loc)
236         if _, err := v.stat(path); err != nil {
237                 return v.translateError(err)
238         }
239         return v.getFunc(path, func(rdr io.Reader) error {
240                 return compareReaderWithBuf(rdr, expect, loc[:32])
241         })
242 }
243
244 // Put stores a block of data identified by the locator string
245 // "loc".  It returns nil on success.  If the volume is full, it
246 // returns a FullError.  If the write fails due to some other error,
247 // that error is returned.
248 func (v *UnixVolume) Put(loc string, block []byte) error {
249         if v.ReadOnly {
250                 return MethodDisabledError
251         }
252         if v.IsFull() {
253                 return FullError
254         }
255         bdir := v.blockDir(loc)
256         if err := os.MkdirAll(bdir, 0755); err != nil {
257                 log.Printf("%s: could not create directory %s: %s",
258                         loc, bdir, err)
259                 return err
260         }
261
262         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
263         if tmperr != nil {
264                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
265                 return tmperr
266         }
267         bpath := v.blockPath(loc)
268
269         if v.locker != nil {
270                 v.locker.Lock()
271                 defer v.locker.Unlock()
272         }
273         if _, err := tmpfile.Write(block); err != nil {
274                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
275                 tmpfile.Close()
276                 os.Remove(tmpfile.Name())
277                 return err
278         }
279         if err := tmpfile.Close(); err != nil {
280                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
281                 os.Remove(tmpfile.Name())
282                 return err
283         }
284         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
285                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
286                 os.Remove(tmpfile.Name())
287                 return err
288         }
289         return nil
290 }
291
292 // Status returns a VolumeStatus struct describing the volume's
293 // current state, or nil if an error occurs.
294 //
295 func (v *UnixVolume) Status() *VolumeStatus {
296         var fs syscall.Statfs_t
297         var devnum uint64
298
299         if fi, err := os.Stat(v.Root); err == nil {
300                 devnum = fi.Sys().(*syscall.Stat_t).Dev
301         } else {
302                 log.Printf("%s: os.Stat: %s\n", v, err)
303                 return nil
304         }
305
306         err := syscall.Statfs(v.Root, &fs)
307         if err != nil {
308                 log.Printf("%s: statfs: %s\n", v, err)
309                 return nil
310         }
311         // These calculations match the way df calculates disk usage:
312         // "free" space is measured by fs.Bavail, but "used" space
313         // uses fs.Blocks - fs.Bfree.
314         free := fs.Bavail * uint64(fs.Bsize)
315         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
316         return &VolumeStatus{v.Root, devnum, free, used}
317 }
318
319 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
320 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
321
322 // IndexTo writes (to the given Writer) a list of blocks found on this
323 // volume which begin with the specified prefix. If the prefix is an
324 // empty string, IndexTo writes a complete list of blocks.
325 //
326 // Each block is given in the format
327 //
328 //     locator+size modification-time {newline}
329 //
330 // e.g.:
331 //
332 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
333 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
334 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
335 //
336 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
337         var lastErr error
338         rootdir, err := os.Open(v.Root)
339         if err != nil {
340                 return err
341         }
342         defer rootdir.Close()
343         for {
344                 names, err := rootdir.Readdirnames(1)
345                 if err == io.EOF {
346                         return lastErr
347                 } else if err != nil {
348                         return err
349                 }
350                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
351                         // prefix excludes all blocks stored in this dir
352                         continue
353                 }
354                 if !blockDirRe.MatchString(names[0]) {
355                         continue
356                 }
357                 blockdirpath := filepath.Join(v.Root, names[0])
358                 blockdir, err := os.Open(blockdirpath)
359                 if err != nil {
360                         log.Print("Error reading ", blockdirpath, ": ", err)
361                         lastErr = err
362                         continue
363                 }
364                 for {
365                         fileInfo, err := blockdir.Readdir(1)
366                         if err == io.EOF {
367                                 break
368                         } else if err != nil {
369                                 log.Print("Error reading ", blockdirpath, ": ", err)
370                                 lastErr = err
371                                 break
372                         }
373                         name := fileInfo[0].Name()
374                         if !strings.HasPrefix(name, prefix) {
375                                 continue
376                         }
377                         if !blockFileRe.MatchString(name) {
378                                 continue
379                         }
380                         _, err = fmt.Fprint(w,
381                                 name,
382                                 "+", fileInfo[0].Size(),
383                                 " ", fileInfo[0].ModTime().UnixNano(),
384                                 "\n")
385                 }
386                 blockdir.Close()
387         }
388 }
389
390 // Trash trashes the block data from the unix storage
391 // If TrashLifetime == 0, the block is deleted
392 // Else, the block is renamed as path/{loc}.trash.{deadline},
393 // where deadline = now + TrashLifetime
394 func (v *UnixVolume) Trash(loc string) error {
395         // Touch() must be called before calling Write() on a block.  Touch()
396         // also uses lockfile().  This avoids a race condition between Write()
397         // and Trash() because either (a) the file will be trashed and Touch()
398         // will signal to the caller that the file is not present (and needs to
399         // be re-written), or (b) Touch() will update the file's timestamp and
400         // Trash() will read the correct up-to-date timestamp and choose not to
401         // trash the file.
402
403         if v.ReadOnly {
404                 return MethodDisabledError
405         }
406         if v.locker != nil {
407                 v.locker.Lock()
408                 defer v.locker.Unlock()
409         }
410         p := v.blockPath(loc)
411         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
412         if err != nil {
413                 return err
414         }
415         defer f.Close()
416         if e := lockfile(f); e != nil {
417                 return e
418         }
419         defer unlockfile(f)
420
421         // If the block has been PUT in the last blobSignatureTTL
422         // seconds, return success without removing the block. This
423         // protects data from garbage collection until it is no longer
424         // possible for clients to retrieve the unreferenced blocks
425         // anyway (because the permission signatures have expired).
426         if fi, err := os.Stat(p); err != nil {
427                 return err
428         } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
429                 return nil
430         }
431
432         if theConfig.TrashLifetime == 0 {
433                 return os.Remove(p)
434         }
435         return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
436 }
437
438 // Untrash moves block from trash back into store
439 // Look for path/{loc}.trash.{deadline} in storage,
440 // and rename the first such file as path/{loc}
441 func (v *UnixVolume) Untrash(loc string) (err error) {
442         if v.ReadOnly {
443                 return MethodDisabledError
444         }
445
446         files, err := ioutil.ReadDir(v.blockDir(loc))
447         if err != nil {
448                 return err
449         }
450
451         if len(files) == 0 {
452                 return os.ErrNotExist
453         }
454
455         foundTrash := false
456         prefix := fmt.Sprintf("%v.trash.", loc)
457         for _, f := range files {
458                 if strings.HasPrefix(f.Name(), prefix) {
459                         foundTrash = true
460                         err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
461                         if err == nil {
462                                 break
463                         }
464                 }
465         }
466
467         if foundTrash == false {
468                 return os.ErrNotExist
469         }
470
471         return
472 }
473
474 // blockDir returns the fully qualified directory name for the directory
475 // where loc is (or would be) stored on this volume.
476 func (v *UnixVolume) blockDir(loc string) string {
477         return filepath.Join(v.Root, loc[0:3])
478 }
479
480 // blockPath returns the fully qualified pathname for the path to loc
481 // on this volume.
482 func (v *UnixVolume) blockPath(loc string) string {
483         return filepath.Join(v.blockDir(loc), loc)
484 }
485
486 // IsFull returns true if the free space on the volume is less than
487 // MinFreeKilobytes.
488 //
489 func (v *UnixVolume) IsFull() (isFull bool) {
490         fullSymlink := v.Root + "/full"
491
492         // Check if the volume has been marked as full in the last hour.
493         if link, err := os.Readlink(fullSymlink); err == nil {
494                 if ts, err := strconv.Atoi(link); err == nil {
495                         fulltime := time.Unix(int64(ts), 0)
496                         if time.Since(fulltime).Hours() < 1.0 {
497                                 return true
498                         }
499                 }
500         }
501
502         if avail, err := v.FreeDiskSpace(); err == nil {
503                 isFull = avail < MinFreeKilobytes
504         } else {
505                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
506                 isFull = false
507         }
508
509         // If the volume is full, timestamp it.
510         if isFull {
511                 now := fmt.Sprintf("%d", time.Now().Unix())
512                 os.Symlink(now, fullSymlink)
513         }
514         return
515 }
516
517 // FreeDiskSpace returns the number of unused 1k blocks available on
518 // the volume.
519 //
520 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
521         var fs syscall.Statfs_t
522         err = syscall.Statfs(v.Root, &fs)
523         if err == nil {
524                 // Statfs output is not guaranteed to measure free
525                 // space in terms of 1K blocks.
526                 free = fs.Bavail * uint64(fs.Bsize) / 1024
527         }
528         return
529 }
530
531 func (v *UnixVolume) String() string {
532         return fmt.Sprintf("[UnixVolume %s]", v.Root)
533 }
534
535 // Writable returns false if all future Put, Mtime, and Delete calls
536 // are expected to fail.
537 func (v *UnixVolume) Writable() bool {
538         return !v.ReadOnly
539 }
540
541 // Replication returns the number of replicas promised by the
542 // underlying device (as specified in configuration).
543 func (v *UnixVolume) Replication() int {
544         return v.DirectoryReplication
545 }
546
547 // lockfile and unlockfile use flock(2) to manage kernel file locks.
548 func lockfile(f *os.File) error {
549         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
550 }
551
552 func unlockfile(f *os.File) error {
553         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
554 }
555
556 // Where appropriate, translate a more specific filesystem error to an
557 // error recognized by handlers, like os.ErrNotExist.
558 func (v *UnixVolume) translateError(err error) error {
559         switch err.(type) {
560         case *os.PathError:
561                 // stat() returns a PathError if the parent directory
562                 // (not just the file itself) is missing
563                 return os.ErrNotExist
564         default:
565                 return err
566         }
567 }
568
569 var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
570
571 // EmptyTrash walks hierarchy looking for {hash}.trash.*
572 // and deletes those with deadline < now.
573 func (v *UnixVolume) EmptyTrash() {
574         var bytesDeleted, bytesInTrash int64
575         var blocksDeleted, blocksInTrash int
576
577         err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
578                 if err != nil {
579                         log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
580                         return nil
581                 }
582                 if info.Mode().IsDir() {
583                         return nil
584                 }
585                 matches := unixTrashLocRegexp.FindStringSubmatch(path)
586                 if len(matches) != 3 {
587                         return nil
588                 }
589                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
590                 if err != nil {
591                         log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
592                         return nil
593                 }
594                 bytesInTrash += info.Size()
595                 blocksInTrash++
596                 if deadline > time.Now().Unix() {
597                         return nil
598                 }
599                 err = os.Remove(path)
600                 if err != nil {
601                         log.Printf("EmptyTrash: Remove %v: %v", path, err)
602                         return nil
603                 }
604                 bytesDeleted += info.Size()
605                 blocksDeleted++
606                 return nil
607         })
608
609         if err != nil {
610                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
611         }
612
613         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
614 }