9956: Remove obsolete TODO comment.
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "flag"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "os"
11         "path/filepath"
12         "regexp"
13         "strconv"
14         "strings"
15         "sync"
16         "syscall"
17         "time"
18 )
19
20 type unixVolumeAdder struct {
21         *Config
22 }
23
24 // String implements flag.Value
25 func (s *unixVolumeAdder) String() string {
26         return "-"
27 }
28
29 func (vs *unixVolumeAdder) Set(path string) error {
30         if dirs := strings.Split(path, ","); len(dirs) > 1 {
31                 log.Print("DEPRECATED: using comma-separated volume list.")
32                 for _, dir := range dirs {
33                         if err := vs.Set(dir); err != nil {
34                                 return err
35                         }
36                 }
37                 return nil
38         }
39         vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
40                 Root:      path,
41                 ReadOnly:  deprecated.flagReadonly,
42                 Serialize: deprecated.flagSerializeIO,
43         })
44         return nil
45 }
46
47 func init() {
48         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
49
50         flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
51         flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
52 }
53
54 // Discover adds a UnixVolume for every directory named "keep" that is
55 // located at the top level of a device- or tmpfs-backed mount point
56 // other than "/". It returns the number of volumes added.
57 func (vs *unixVolumeAdder) Discover() int {
58         added := 0
59         f, err := os.Open(ProcMounts)
60         if err != nil {
61                 log.Fatalf("opening %s: %s", ProcMounts, err)
62         }
63         scanner := bufio.NewScanner(f)
64         for scanner.Scan() {
65                 args := strings.Fields(scanner.Text())
66                 if err := scanner.Err(); err != nil {
67                         log.Fatalf("reading %s: %s", ProcMounts, err)
68                 }
69                 dev, mount := args[0], args[1]
70                 if mount == "/" {
71                         continue
72                 }
73                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
74                         continue
75                 }
76                 keepdir := mount + "/keep"
77                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
78                         continue
79                 }
80                 // Set the -readonly flag (but only for this volume)
81                 // if the filesystem is mounted readonly.
82                 flagReadonlyWas := deprecated.flagReadonly
83                 for _, fsopt := range strings.Split(args[3], ",") {
84                         if fsopt == "ro" {
85                                 deprecated.flagReadonly = true
86                                 break
87                         }
88                         if fsopt == "rw" {
89                                 break
90                         }
91                 }
92                 if err := vs.Set(keepdir); err != nil {
93                         log.Printf("adding %q: %s", keepdir, err)
94                 } else {
95                         added++
96                 }
97                 deprecated.flagReadonly = flagReadonlyWas
98         }
99         return added
100 }
101
102 // A UnixVolume stores and retrieves blocks in a local directory.
103 type UnixVolume struct {
104         Root      string // path to the volume's root directory
105         ReadOnly  bool
106         Serialize bool
107
108         // something to lock during IO, typically a sync.Mutex (or nil
109         // to skip locking)
110         locker sync.Locker
111 }
112
113 // Examples implements VolumeWithExamples.
114 func (*UnixVolume) Examples() []Volume {
115         return []Volume{
116                 &UnixVolume{
117                         Root:      "/mnt/local-disk",
118                         Serialize: true,
119                 },
120                 &UnixVolume{
121                         Root:      "/mnt/network-disk",
122                         Serialize: false,
123                 },
124         }
125 }
126
127 // Type implements Volume
128 func (v *UnixVolume) Type() string {
129         return "Directory"
130 }
131
132 // Start implements Volume
133 func (v *UnixVolume) Start() error {
134         if v.Serialize {
135                 v.locker = &sync.Mutex{}
136         }
137         if !strings.HasPrefix(v.Root, "/") {
138                 return fmt.Errorf("volume root does not start with '/': %q", v.Root)
139         }
140         _, err := os.Stat(v.Root)
141         return err
142 }
143
144 // Touch sets the timestamp for the given locator to the current time
145 func (v *UnixVolume) Touch(loc string) error {
146         if v.ReadOnly {
147                 return MethodDisabledError
148         }
149         p := v.blockPath(loc)
150         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
151         if err != nil {
152                 return err
153         }
154         defer f.Close()
155         if v.locker != nil {
156                 v.locker.Lock()
157                 defer v.locker.Unlock()
158         }
159         if e := lockfile(f); e != nil {
160                 return e
161         }
162         defer unlockfile(f)
163         ts := syscall.NsecToTimespec(time.Now().UnixNano())
164         return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
165 }
166
167 // Mtime returns the stored timestamp for the given locator.
168 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
169         p := v.blockPath(loc)
170         fi, err := os.Stat(p)
171         if err != nil {
172                 return time.Time{}, err
173         }
174         return fi.ModTime(), nil
175 }
176
177 // Lock the locker (if one is in use), open the file for reading, and
178 // call the given function if and when the file is ready to read.
179 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
180         if v.locker != nil {
181                 v.locker.Lock()
182                 defer v.locker.Unlock()
183         }
184         f, err := os.Open(path)
185         if err != nil {
186                 return err
187         }
188         defer f.Close()
189         return fn(f)
190 }
191
192 // stat is os.Stat() with some extra sanity checks.
193 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
194         stat, err := os.Stat(path)
195         if err == nil {
196                 if stat.Size() < 0 {
197                         err = os.ErrInvalid
198                 } else if stat.Size() > BlockSize {
199                         err = TooLongError
200                 }
201         }
202         return stat, err
203 }
204
205 // Get retrieves a block, copies it to the given slice, and returns
206 // the number of bytes copied.
207 func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
208         path := v.blockPath(loc)
209         stat, err := v.stat(path)
210         if err != nil {
211                 return 0, v.translateError(err)
212         }
213         if stat.Size() > int64(len(buf)) {
214                 return 0, TooLongError
215         }
216         var read int
217         size := int(stat.Size())
218         err = v.getFunc(path, func(rdr io.Reader) error {
219                 read, err = io.ReadFull(rdr, buf[:size])
220                 return err
221         })
222         return read, err
223 }
224
225 // Compare returns nil if Get(loc) would return the same content as
226 // expect. It is functionally equivalent to Get() followed by
227 // bytes.Compare(), but uses less memory.
228 func (v *UnixVolume) Compare(loc string, expect []byte) error {
229         path := v.blockPath(loc)
230         if _, err := v.stat(path); err != nil {
231                 return v.translateError(err)
232         }
233         return v.getFunc(path, func(rdr io.Reader) error {
234                 return compareReaderWithBuf(rdr, expect, loc[:32])
235         })
236 }
237
238 // Put stores a block of data identified by the locator string
239 // "loc".  It returns nil on success.  If the volume is full, it
240 // returns a FullError.  If the write fails due to some other error,
241 // that error is returned.
242 func (v *UnixVolume) Put(loc string, block []byte) error {
243         if v.ReadOnly {
244                 return MethodDisabledError
245         }
246         if v.IsFull() {
247                 return FullError
248         }
249         bdir := v.blockDir(loc)
250         if err := os.MkdirAll(bdir, 0755); err != nil {
251                 log.Printf("%s: could not create directory %s: %s",
252                         loc, bdir, err)
253                 return err
254         }
255
256         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
257         if tmperr != nil {
258                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
259                 return tmperr
260         }
261         bpath := v.blockPath(loc)
262
263         if v.locker != nil {
264                 v.locker.Lock()
265                 defer v.locker.Unlock()
266         }
267         if _, err := tmpfile.Write(block); err != nil {
268                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
269                 tmpfile.Close()
270                 os.Remove(tmpfile.Name())
271                 return err
272         }
273         if err := tmpfile.Close(); err != nil {
274                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
275                 os.Remove(tmpfile.Name())
276                 return err
277         }
278         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
279                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
280                 os.Remove(tmpfile.Name())
281                 return err
282         }
283         return nil
284 }
285
286 // Status returns a VolumeStatus struct describing the volume's
287 // current state, or nil if an error occurs.
288 //
289 func (v *UnixVolume) Status() *VolumeStatus {
290         var fs syscall.Statfs_t
291         var devnum uint64
292
293         if fi, err := os.Stat(v.Root); err == nil {
294                 devnum = fi.Sys().(*syscall.Stat_t).Dev
295         } else {
296                 log.Printf("%s: os.Stat: %s\n", v, err)
297                 return nil
298         }
299
300         err := syscall.Statfs(v.Root, &fs)
301         if err != nil {
302                 log.Printf("%s: statfs: %s\n", v, err)
303                 return nil
304         }
305         // These calculations match the way df calculates disk usage:
306         // "free" space is measured by fs.Bavail, but "used" space
307         // uses fs.Blocks - fs.Bfree.
308         free := fs.Bavail * uint64(fs.Bsize)
309         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
310         return &VolumeStatus{v.Root, devnum, free, used}
311 }
312
313 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
314 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
315
316 // IndexTo writes (to the given Writer) a list of blocks found on this
317 // volume which begin with the specified prefix. If the prefix is an
318 // empty string, IndexTo writes a complete list of blocks.
319 //
320 // Each block is given in the format
321 //
322 //     locator+size modification-time {newline}
323 //
324 // e.g.:
325 //
326 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
327 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
328 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
329 //
330 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
331         var lastErr error
332         rootdir, err := os.Open(v.Root)
333         if err != nil {
334                 return err
335         }
336         defer rootdir.Close()
337         for {
338                 names, err := rootdir.Readdirnames(1)
339                 if err == io.EOF {
340                         return lastErr
341                 } else if err != nil {
342                         return err
343                 }
344                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
345                         // prefix excludes all blocks stored in this dir
346                         continue
347                 }
348                 if !blockDirRe.MatchString(names[0]) {
349                         continue
350                 }
351                 blockdirpath := filepath.Join(v.Root, names[0])
352                 blockdir, err := os.Open(blockdirpath)
353                 if err != nil {
354                         log.Print("Error reading ", blockdirpath, ": ", err)
355                         lastErr = err
356                         continue
357                 }
358                 for {
359                         fileInfo, err := blockdir.Readdir(1)
360                         if err == io.EOF {
361                                 break
362                         } else if err != nil {
363                                 log.Print("Error reading ", blockdirpath, ": ", err)
364                                 lastErr = err
365                                 break
366                         }
367                         name := fileInfo[0].Name()
368                         if !strings.HasPrefix(name, prefix) {
369                                 continue
370                         }
371                         if !blockFileRe.MatchString(name) {
372                                 continue
373                         }
374                         _, err = fmt.Fprint(w,
375                                 name,
376                                 "+", fileInfo[0].Size(),
377                                 " ", fileInfo[0].ModTime().UnixNano(),
378                                 "\n")
379                 }
380                 blockdir.Close()
381         }
382 }
383
384 // Trash trashes the block data from the unix storage
385 // If TrashLifetime == 0, the block is deleted
386 // Else, the block is renamed as path/{loc}.trash.{deadline},
387 // where deadline = now + TrashLifetime
388 func (v *UnixVolume) Trash(loc string) error {
389         // Touch() must be called before calling Write() on a block.  Touch()
390         // also uses lockfile().  This avoids a race condition between Write()
391         // and Trash() because either (a) the file will be trashed and Touch()
392         // will signal to the caller that the file is not present (and needs to
393         // be re-written), or (b) Touch() will update the file's timestamp and
394         // Trash() will read the correct up-to-date timestamp and choose not to
395         // trash the file.
396
397         if v.ReadOnly {
398                 return MethodDisabledError
399         }
400         if v.locker != nil {
401                 v.locker.Lock()
402                 defer v.locker.Unlock()
403         }
404         p := v.blockPath(loc)
405         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
406         if err != nil {
407                 return err
408         }
409         defer f.Close()
410         if e := lockfile(f); e != nil {
411                 return e
412         }
413         defer unlockfile(f)
414
415         // If the block has been PUT in the last blobSignatureTTL
416         // seconds, return success without removing the block. This
417         // protects data from garbage collection until it is no longer
418         // possible for clients to retrieve the unreferenced blocks
419         // anyway (because the permission signatures have expired).
420         if fi, err := os.Stat(p); err != nil {
421                 return err
422         } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
423                 return nil
424         }
425
426         if theConfig.TrashLifetime == 0 {
427                 return os.Remove(p)
428         }
429         return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
430 }
431
432 // Untrash moves block from trash back into store
433 // Look for path/{loc}.trash.{deadline} in storage,
434 // and rename the first such file as path/{loc}
435 func (v *UnixVolume) Untrash(loc string) (err error) {
436         if v.ReadOnly {
437                 return MethodDisabledError
438         }
439
440         files, err := ioutil.ReadDir(v.blockDir(loc))
441         if err != nil {
442                 return err
443         }
444
445         if len(files) == 0 {
446                 return os.ErrNotExist
447         }
448
449         foundTrash := false
450         prefix := fmt.Sprintf("%v.trash.", loc)
451         for _, f := range files {
452                 if strings.HasPrefix(f.Name(), prefix) {
453                         foundTrash = true
454                         err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
455                         if err == nil {
456                                 break
457                         }
458                 }
459         }
460
461         if foundTrash == false {
462                 return os.ErrNotExist
463         }
464
465         return
466 }
467
468 // blockDir returns the fully qualified directory name for the directory
469 // where loc is (or would be) stored on this volume.
470 func (v *UnixVolume) blockDir(loc string) string {
471         return filepath.Join(v.Root, loc[0:3])
472 }
473
474 // blockPath returns the fully qualified pathname for the path to loc
475 // on this volume.
476 func (v *UnixVolume) blockPath(loc string) string {
477         return filepath.Join(v.blockDir(loc), loc)
478 }
479
480 // IsFull returns true if the free space on the volume is less than
481 // MinFreeKilobytes.
482 //
483 func (v *UnixVolume) IsFull() (isFull bool) {
484         fullSymlink := v.Root + "/full"
485
486         // Check if the volume has been marked as full in the last hour.
487         if link, err := os.Readlink(fullSymlink); err == nil {
488                 if ts, err := strconv.Atoi(link); err == nil {
489                         fulltime := time.Unix(int64(ts), 0)
490                         if time.Since(fulltime).Hours() < 1.0 {
491                                 return true
492                         }
493                 }
494         }
495
496         if avail, err := v.FreeDiskSpace(); err == nil {
497                 isFull = avail < MinFreeKilobytes
498         } else {
499                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
500                 isFull = false
501         }
502
503         // If the volume is full, timestamp it.
504         if isFull {
505                 now := fmt.Sprintf("%d", time.Now().Unix())
506                 os.Symlink(now, fullSymlink)
507         }
508         return
509 }
510
511 // FreeDiskSpace returns the number of unused 1k blocks available on
512 // the volume.
513 //
514 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
515         var fs syscall.Statfs_t
516         err = syscall.Statfs(v.Root, &fs)
517         if err == nil {
518                 // Statfs output is not guaranteed to measure free
519                 // space in terms of 1K blocks.
520                 free = fs.Bavail * uint64(fs.Bsize) / 1024
521         }
522         return
523 }
524
525 func (v *UnixVolume) String() string {
526         return fmt.Sprintf("[UnixVolume %s]", v.Root)
527 }
528
529 // Writable returns false if all future Put, Mtime, and Delete calls
530 // are expected to fail.
531 func (v *UnixVolume) Writable() bool {
532         return !v.ReadOnly
533 }
534
535 // Replication returns the number of replicas promised by the
536 // underlying device (currently assumed to be 1).
537 func (v *UnixVolume) Replication() int {
538         return 1
539 }
540
541 // lockfile and unlockfile use flock(2) to manage kernel file locks.
542 func lockfile(f *os.File) error {
543         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
544 }
545
546 func unlockfile(f *os.File) error {
547         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
548 }
549
550 // Where appropriate, translate a more specific filesystem error to an
551 // error recognized by handlers, like os.ErrNotExist.
552 func (v *UnixVolume) translateError(err error) error {
553         switch err.(type) {
554         case *os.PathError:
555                 // stat() returns a PathError if the parent directory
556                 // (not just the file itself) is missing
557                 return os.ErrNotExist
558         default:
559                 return err
560         }
561 }
562
563 var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
564
565 // EmptyTrash walks hierarchy looking for {hash}.trash.*
566 // and deletes those with deadline < now.
567 func (v *UnixVolume) EmptyTrash() {
568         var bytesDeleted, bytesInTrash int64
569         var blocksDeleted, blocksInTrash int
570
571         err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
572                 if err != nil {
573                         log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
574                         return nil
575                 }
576                 if info.Mode().IsDir() {
577                         return nil
578                 }
579                 matches := unixTrashLocRegexp.FindStringSubmatch(path)
580                 if len(matches) != 3 {
581                         return nil
582                 }
583                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
584                 if err != nil {
585                         log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
586                         return nil
587                 }
588                 bytesInTrash += info.Size()
589                 blocksInTrash++
590                 if deadline > time.Now().Unix() {
591                         return nil
592                 }
593                 err = os.Remove(path)
594                 if err != nil {
595                         log.Printf("EmptyTrash: Remove %v: %v", path, err)
596                         return nil
597                 }
598                 bytesDeleted += info.Size()
599                 blocksDeleted++
600                 return nil
601         })
602
603         if err != nil {
604                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
605         }
606
607         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
608 }