Merge branch '8087-arv-cli-request-body-from-file' of https://github.com/wtsi-hgi...
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if dirs := strings.Split(value, ","); len(dirs) > 1 {
27                 log.Print("DEPRECATED: using comma-separated volume list.")
28                 for _, dir := range dirs {
29                         if err := vs.Set(dir); err != nil {
30                                 return err
31                         }
32                 }
33                 return nil
34         }
35         if len(value) == 0 || value[0] != '/' {
36                 return errors.New("Invalid volume: must begin with '/'.")
37         }
38         if _, err := os.Stat(value); err != nil {
39                 return err
40         }
41         var locker sync.Locker
42         if flagSerializeIO {
43                 locker = &sync.Mutex{}
44         }
45         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
46                 root:     value,
47                 locker:   locker,
48                 readonly: flagReadonly,
49         })
50         return nil
51 }
52
53 func init() {
54         flag.Var(
55                 &unixVolumeAdder{&volumes},
56                 "volumes",
57                 "Deprecated synonym for -volume.")
58         flag.Var(
59                 &unixVolumeAdder{&volumes},
60                 "volume",
61                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
62 }
63
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
68         added := 0
69         f, err := os.Open(ProcMounts)
70         if err != nil {
71                 log.Fatalf("opening %s: %s", ProcMounts, err)
72         }
73         scanner := bufio.NewScanner(f)
74         for scanner.Scan() {
75                 args := strings.Fields(scanner.Text())
76                 if err := scanner.Err(); err != nil {
77                         log.Fatalf("reading %s: %s", ProcMounts, err)
78                 }
79                 dev, mount := args[0], args[1]
80                 if mount == "/" {
81                         continue
82                 }
83                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
84                         continue
85                 }
86                 keepdir := mount + "/keep"
87                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
88                         continue
89                 }
90                 // Set the -readonly flag (but only for this volume)
91                 // if the filesystem is mounted readonly.
92                 flagReadonlyWas := flagReadonly
93                 for _, fsopt := range strings.Split(args[3], ",") {
94                         if fsopt == "ro" {
95                                 flagReadonly = true
96                                 break
97                         }
98                         if fsopt == "rw" {
99                                 break
100                         }
101                 }
102                 if err := vs.Set(keepdir); err != nil {
103                         log.Printf("adding %q: %s", keepdir, err)
104                 } else {
105                         added++
106                 }
107                 flagReadonly = flagReadonlyWas
108         }
109         return added
110 }
111
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114         // path to the volume's root directory
115         root string
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker   sync.Locker
119         readonly bool
120 }
121
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
124         if v.readonly {
125                 return MethodDisabledError
126         }
127         p := v.blockPath(loc)
128         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
129         if err != nil {
130                 return err
131         }
132         defer f.Close()
133         if v.locker != nil {
134                 v.locker.Lock()
135                 defer v.locker.Unlock()
136         }
137         if e := lockfile(f); e != nil {
138                 return e
139         }
140         defer unlockfile(f)
141         now := time.Now().Unix()
142         utime := syscall.Utimbuf{now, now}
143         return syscall.Utime(p, &utime)
144 }
145
146 // Mtime returns the stored timestamp for the given locator.
147 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
148         p := v.blockPath(loc)
149         fi, err := os.Stat(p)
150         if err != nil {
151                 return time.Time{}, err
152         }
153         return fi.ModTime(), nil
154 }
155
156 // Lock the locker (if one is in use), open the file for reading, and
157 // call the given function if and when the file is ready to read.
158 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
159         if v.locker != nil {
160                 v.locker.Lock()
161                 defer v.locker.Unlock()
162         }
163         f, err := os.Open(path)
164         if err != nil {
165                 return err
166         }
167         defer f.Close()
168         return fn(f)
169 }
170
171 // stat is os.Stat() with some extra sanity checks.
172 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
173         stat, err := os.Stat(path)
174         if err == nil {
175                 if stat.Size() < 0 {
176                         err = os.ErrInvalid
177                 } else if stat.Size() > BlockSize {
178                         err = TooLongError
179                 }
180         }
181         return stat, err
182 }
183
184 // Get retrieves a block identified by the locator string "loc", and
185 // returns its contents as a byte slice.
186 //
187 // Get returns a nil buffer IFF it returns a non-nil error.
188 func (v *UnixVolume) Get(loc string) ([]byte, error) {
189         path := v.blockPath(loc)
190         stat, err := v.stat(path)
191         if err != nil {
192                 return nil, v.translateError(err)
193         }
194         buf := bufs.Get(int(stat.Size()))
195         err = v.getFunc(path, func(rdr io.Reader) error {
196                 _, err = io.ReadFull(rdr, buf)
197                 return err
198         })
199         if err != nil {
200                 bufs.Put(buf)
201                 return nil, err
202         }
203         return buf, nil
204 }
205
206 // Compare returns nil if Get(loc) would return the same content as
207 // expect. It is functionally equivalent to Get() followed by
208 // bytes.Compare(), but uses less memory.
209 func (v *UnixVolume) Compare(loc string, expect []byte) error {
210         path := v.blockPath(loc)
211         if _, err := v.stat(path); err != nil {
212                 return v.translateError(err)
213         }
214         return v.getFunc(path, func(rdr io.Reader) error {
215                 return compareReaderWithBuf(rdr, expect, loc[:32])
216         })
217 }
218
219 // Put stores a block of data identified by the locator string
220 // "loc".  It returns nil on success.  If the volume is full, it
221 // returns a FullError.  If the write fails due to some other error,
222 // that error is returned.
223 func (v *UnixVolume) Put(loc string, block []byte) error {
224         if v.readonly {
225                 return MethodDisabledError
226         }
227         if v.IsFull() {
228                 return FullError
229         }
230         bdir := v.blockDir(loc)
231         if err := os.MkdirAll(bdir, 0755); err != nil {
232                 log.Printf("%s: could not create directory %s: %s",
233                         loc, bdir, err)
234                 return err
235         }
236
237         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
238         if tmperr != nil {
239                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
240                 return tmperr
241         }
242         bpath := v.blockPath(loc)
243
244         if v.locker != nil {
245                 v.locker.Lock()
246                 defer v.locker.Unlock()
247         }
248         if _, err := tmpfile.Write(block); err != nil {
249                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
250                 tmpfile.Close()
251                 os.Remove(tmpfile.Name())
252                 return err
253         }
254         if err := tmpfile.Close(); err != nil {
255                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
256                 os.Remove(tmpfile.Name())
257                 return err
258         }
259         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
260                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
261                 os.Remove(tmpfile.Name())
262                 return err
263         }
264         return nil
265 }
266
267 // Status returns a VolumeStatus struct describing the volume's
268 // current state, or nil if an error occurs.
269 //
270 func (v *UnixVolume) Status() *VolumeStatus {
271         var fs syscall.Statfs_t
272         var devnum uint64
273
274         if fi, err := os.Stat(v.root); err == nil {
275                 devnum = fi.Sys().(*syscall.Stat_t).Dev
276         } else {
277                 log.Printf("%s: os.Stat: %s\n", v, err)
278                 return nil
279         }
280
281         err := syscall.Statfs(v.root, &fs)
282         if err != nil {
283                 log.Printf("%s: statfs: %s\n", v, err)
284                 return nil
285         }
286         // These calculations match the way df calculates disk usage:
287         // "free" space is measured by fs.Bavail, but "used" space
288         // uses fs.Blocks - fs.Bfree.
289         free := fs.Bavail * uint64(fs.Bsize)
290         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
291         return &VolumeStatus{v.root, devnum, free, used}
292 }
293
294 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
295 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
296
297 // IndexTo writes (to the given Writer) a list of blocks found on this
298 // volume which begin with the specified prefix. If the prefix is an
299 // empty string, IndexTo writes a complete list of blocks.
300 //
301 // Each block is given in the format
302 //
303 //     locator+size modification-time {newline}
304 //
305 // e.g.:
306 //
307 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
308 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
309 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
310 //
311 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
312         var lastErr error = nil
313         rootdir, err := os.Open(v.root)
314         if err != nil {
315                 return err
316         }
317         defer rootdir.Close()
318         for {
319                 names, err := rootdir.Readdirnames(1)
320                 if err == io.EOF {
321                         return lastErr
322                 } else if err != nil {
323                         return err
324                 }
325                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
326                         // prefix excludes all blocks stored in this dir
327                         continue
328                 }
329                 if !blockDirRe.MatchString(names[0]) {
330                         continue
331                 }
332                 blockdirpath := filepath.Join(v.root, names[0])
333                 blockdir, err := os.Open(blockdirpath)
334                 if err != nil {
335                         log.Print("Error reading ", blockdirpath, ": ", err)
336                         lastErr = err
337                         continue
338                 }
339                 for {
340                         fileInfo, err := blockdir.Readdir(1)
341                         if err == io.EOF {
342                                 break
343                         } else if err != nil {
344                                 log.Print("Error reading ", blockdirpath, ": ", err)
345                                 lastErr = err
346                                 break
347                         }
348                         name := fileInfo[0].Name()
349                         if !strings.HasPrefix(name, prefix) {
350                                 continue
351                         }
352                         if !blockFileRe.MatchString(name) {
353                                 continue
354                         }
355                         _, err = fmt.Fprint(w,
356                                 name,
357                                 "+", fileInfo[0].Size(),
358                                 " ", fileInfo[0].ModTime().Unix(),
359                                 "\n")
360                 }
361                 blockdir.Close()
362         }
363 }
364
365 // Trash trashes the block data from the unix storage
366 // If trashLifetime == 0, the block is deleted
367 // Else, the block is renamed as path/{loc}.trash.{deadline},
368 // where deadline = now + trashLifetime
369 func (v *UnixVolume) Trash(loc string) error {
370         // Touch() must be called before calling Write() on a block.  Touch()
371         // also uses lockfile().  This avoids a race condition between Write()
372         // and Trash() because either (a) the file will be trashed and Touch()
373         // will signal to the caller that the file is not present (and needs to
374         // be re-written), or (b) Touch() will update the file's timestamp and
375         // Trash() will read the correct up-to-date timestamp and choose not to
376         // trash the file.
377
378         if v.readonly {
379                 return MethodDisabledError
380         }
381         if v.locker != nil {
382                 v.locker.Lock()
383                 defer v.locker.Unlock()
384         }
385         p := v.blockPath(loc)
386         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
387         if err != nil {
388                 return err
389         }
390         defer f.Close()
391         if e := lockfile(f); e != nil {
392                 return e
393         }
394         defer unlockfile(f)
395
396         // If the block has been PUT in the last blobSignatureTTL
397         // seconds, return success without removing the block. This
398         // protects data from garbage collection until it is no longer
399         // possible for clients to retrieve the unreferenced blocks
400         // anyway (because the permission signatures have expired).
401         if fi, err := os.Stat(p); err != nil {
402                 return err
403         } else {
404                 if time.Since(fi.ModTime()) < blobSignatureTTL {
405                         return nil
406                 }
407         }
408
409         if trashLifetime == 0 {
410                 return os.Remove(p)
411         }
412         return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
413 }
414
415 // Untrash moves block from trash back into store
416 // Look for path/{loc}.trash.{deadline} in storage,
417 // and rename the first such file as path/{loc}
418 func (v *UnixVolume) Untrash(loc string) (err error) {
419         if v.readonly {
420                 return MethodDisabledError
421         }
422
423         files, err := ioutil.ReadDir(v.blockDir(loc))
424         if err != nil {
425                 return err
426         }
427
428         if len(files) == 0 {
429                 return os.ErrNotExist
430         }
431
432         foundTrash := false
433         prefix := fmt.Sprintf("%v.trash.", loc)
434         for _, f := range files {
435                 if strings.HasPrefix(f.Name(), prefix) {
436                         foundTrash = true
437                         err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
438                         if err == nil {
439                                 break
440                         }
441                 }
442         }
443
444         if foundTrash == false {
445                 return os.ErrNotExist
446         }
447
448         return
449 }
450
451 // blockDir returns the fully qualified directory name for the directory
452 // where loc is (or would be) stored on this volume.
453 func (v *UnixVolume) blockDir(loc string) string {
454         return filepath.Join(v.root, loc[0:3])
455 }
456
457 // blockPath returns the fully qualified pathname for the path to loc
458 // on this volume.
459 func (v *UnixVolume) blockPath(loc string) string {
460         return filepath.Join(v.blockDir(loc), loc)
461 }
462
463 // IsFull returns true if the free space on the volume is less than
464 // MinFreeKilobytes.
465 //
466 func (v *UnixVolume) IsFull() (isFull bool) {
467         fullSymlink := v.root + "/full"
468
469         // Check if the volume has been marked as full in the last hour.
470         if link, err := os.Readlink(fullSymlink); err == nil {
471                 if ts, err := strconv.Atoi(link); err == nil {
472                         fulltime := time.Unix(int64(ts), 0)
473                         if time.Since(fulltime).Hours() < 1.0 {
474                                 return true
475                         }
476                 }
477         }
478
479         if avail, err := v.FreeDiskSpace(); err == nil {
480                 isFull = avail < MinFreeKilobytes
481         } else {
482                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
483                 isFull = false
484         }
485
486         // If the volume is full, timestamp it.
487         if isFull {
488                 now := fmt.Sprintf("%d", time.Now().Unix())
489                 os.Symlink(now, fullSymlink)
490         }
491         return
492 }
493
494 // FreeDiskSpace returns the number of unused 1k blocks available on
495 // the volume.
496 //
497 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
498         var fs syscall.Statfs_t
499         err = syscall.Statfs(v.root, &fs)
500         if err == nil {
501                 // Statfs output is not guaranteed to measure free
502                 // space in terms of 1K blocks.
503                 free = fs.Bavail * uint64(fs.Bsize) / 1024
504         }
505         return
506 }
507
508 func (v *UnixVolume) String() string {
509         return fmt.Sprintf("[UnixVolume %s]", v.root)
510 }
511
512 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
513 func (v *UnixVolume) Writable() bool {
514         return !v.readonly
515 }
516
517 func (v *UnixVolume) Replication() int {
518         return 1
519 }
520
521 // lockfile and unlockfile use flock(2) to manage kernel file locks.
522 func lockfile(f *os.File) error {
523         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
524 }
525
526 func unlockfile(f *os.File) error {
527         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
528 }
529
530 // Where appropriate, translate a more specific filesystem error to an
531 // error recognized by handlers, like os.ErrNotExist.
532 func (v *UnixVolume) translateError(err error) error {
533         switch err.(type) {
534         case *os.PathError:
535                 // stat() returns a PathError if the parent directory
536                 // (not just the file itself) is missing
537                 return os.ErrNotExist
538         default:
539                 return err
540         }
541 }
542
543 var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
544
545 // EmptyTrash walks hierarchy looking for {hash}.trash.*
546 // and deletes those with deadline < now.
547 func (v *UnixVolume) EmptyTrash() {
548         var bytesDeleted, bytesInTrash int64
549         var blocksDeleted, blocksInTrash int
550
551         err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
552                 if err != nil {
553                         log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
554                         return nil
555                 }
556                 if info.Mode().IsDir() {
557                         return nil
558                 }
559                 matches := trashLocRegexp.FindStringSubmatch(path)
560                 if len(matches) != 3 {
561                         return nil
562                 }
563                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
564                 if err != nil {
565                         log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
566                         return nil
567                 }
568                 bytesInTrash += info.Size()
569                 blocksInTrash++
570                 if deadline > time.Now().Unix() {
571                         return nil
572                 }
573                 err = os.Remove(path)
574                 if err != nil {
575                         log.Printf("EmptyTrash: Remove %v: %v", path, err)
576                         return nil
577                 }
578                 bytesDeleted += info.Size()
579                 blocksDeleted++
580                 return nil
581         })
582
583         if err != nil {
584                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
585         }
586
587         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
588 }