13937: Moves statsTicker metrics init to its own func.
[arvados.git] / services / keepstore / volume_unix.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bufio"
9         "context"
10         "flag"
11         "fmt"
12         "io"
13         "io/ioutil"
14         "os"
15         "os/exec"
16         "path/filepath"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "syscall"
23         "time"
24 )
25
26 type unixVolumeAdder struct {
27         *Config
28 }
29
30 // String implements flag.Value
31 func (vs *unixVolumeAdder) String() string {
32         return "-"
33 }
34
35 func (vs *unixVolumeAdder) Set(path string) error {
36         if dirs := strings.Split(path, ","); len(dirs) > 1 {
37                 log.Print("DEPRECATED: using comma-separated volume list.")
38                 for _, dir := range dirs {
39                         if err := vs.Set(dir); err != nil {
40                                 return err
41                         }
42                 }
43                 return nil
44         }
45         vs.Config.Volumes = append(vs.Config.Volumes, &UnixVolume{
46                 Root:      path,
47                 ReadOnly:  deprecated.flagReadonly,
48                 Serialize: deprecated.flagSerializeIO,
49         })
50         return nil
51 }
52
53 func init() {
54         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &UnixVolume{} })
55
56         flag.Var(&unixVolumeAdder{theConfig}, "volumes", "see Volumes configuration")
57         flag.Var(&unixVolumeAdder{theConfig}, "volume", "see Volumes configuration")
58 }
59
60 // Discover adds a UnixVolume for every directory named "keep" that is
61 // located at the top level of a device- or tmpfs-backed mount point
62 // other than "/". It returns the number of volumes added.
63 func (vs *unixVolumeAdder) Discover() int {
64         added := 0
65         f, err := os.Open(ProcMounts)
66         if err != nil {
67                 log.Fatalf("opening %s: %s", ProcMounts, err)
68         }
69         scanner := bufio.NewScanner(f)
70         for scanner.Scan() {
71                 args := strings.Fields(scanner.Text())
72                 if err := scanner.Err(); err != nil {
73                         log.Fatalf("reading %s: %s", ProcMounts, err)
74                 }
75                 dev, mount := args[0], args[1]
76                 if mount == "/" {
77                         continue
78                 }
79                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
80                         continue
81                 }
82                 keepdir := mount + "/keep"
83                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
84                         continue
85                 }
86                 // Set the -readonly flag (but only for this volume)
87                 // if the filesystem is mounted readonly.
88                 flagReadonlyWas := deprecated.flagReadonly
89                 for _, fsopt := range strings.Split(args[3], ",") {
90                         if fsopt == "ro" {
91                                 deprecated.flagReadonly = true
92                                 break
93                         }
94                         if fsopt == "rw" {
95                                 break
96                         }
97                 }
98                 if err := vs.Set(keepdir); err != nil {
99                         log.Printf("adding %q: %s", keepdir, err)
100                 } else {
101                         added++
102                 }
103                 deprecated.flagReadonly = flagReadonlyWas
104         }
105         return added
106 }
107
108 // A UnixVolume stores and retrieves blocks in a local directory.
109 type UnixVolume struct {
110         Root                 string // path to the volume's root directory
111         ReadOnly             bool
112         Serialize            bool
113         DirectoryReplication int
114         StorageClasses       []string
115
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker sync.Locker
119
120         os osWithStats
121
122         metrics *volumeMetrics
123 }
124
125 // DeviceID returns a globally unique ID for the volume's root
126 // directory, consisting of the filesystem's UUID and the path from
127 // filesystem root to storage directory, joined by "/". For example,
128 // the DeviceID for a local directory "/mnt/xvda1/keep" might be
129 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
130 func (v *UnixVolume) DeviceID() string {
131         giveup := func(f string, args ...interface{}) string {
132                 log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
133                 return ""
134         }
135         buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
136         if err != nil {
137                 return giveup("findmnt: %s (%q)", err, buf)
138         }
139         findmnt := strings.Fields(string(buf))
140         if len(findmnt) < 2 {
141                 return giveup("could not parse findmnt output: %q", buf)
142         }
143         fsRoot, dev := findmnt[0], findmnt[1]
144
145         absRoot, err := filepath.Abs(v.Root)
146         if err != nil {
147                 return giveup("resolving relative path %q: %s", v.Root, err)
148         }
149         realRoot, err := filepath.EvalSymlinks(absRoot)
150         if err != nil {
151                 return giveup("resolving symlinks in %q: %s", absRoot, err)
152         }
153
154         // Find path from filesystem root to realRoot
155         var fsPath string
156         if strings.HasPrefix(realRoot, fsRoot+"/") {
157                 fsPath = realRoot[len(fsRoot):]
158         } else if fsRoot == "/" {
159                 fsPath = realRoot
160         } else if fsRoot == realRoot {
161                 fsPath = ""
162         } else {
163                 return giveup("findmnt reports mount point %q which is not a prefix of volume root %q", fsRoot, realRoot)
164         }
165
166         if !strings.HasPrefix(dev, "/") {
167                 return giveup("mount %q device %q is not a path", fsRoot, dev)
168         }
169
170         fi, err := os.Stat(dev)
171         if err != nil {
172                 return giveup("stat %q: %s\n", dev, err)
173         }
174         ino := fi.Sys().(*syscall.Stat_t).Ino
175
176         // Find a symlink in /dev/disk/by-uuid/ whose target is (i.e.,
177         // has the same inode as) the mounted device
178         udir := "/dev/disk/by-uuid"
179         d, err := os.Open(udir)
180         if err != nil {
181                 return giveup("opening %q: %s", udir, err)
182         }
183         uuids, err := d.Readdirnames(0)
184         if err != nil {
185                 return giveup("reading %q: %s", udir, err)
186         }
187         for _, uuid := range uuids {
188                 link := filepath.Join(udir, uuid)
189                 fi, err = os.Stat(link)
190                 if err != nil {
191                         log.Printf("error: stat %q: %s", link, err)
192                         continue
193                 }
194                 if fi.Sys().(*syscall.Stat_t).Ino == ino {
195                         return uuid + fsPath
196                 }
197         }
198         return giveup("could not find entry in %q matching %q", udir, dev)
199 }
200
201 // Examples implements VolumeWithExamples.
202 func (*UnixVolume) Examples() []Volume {
203         return []Volume{
204                 &UnixVolume{
205                         Root:                 "/mnt/local-disk",
206                         Serialize:            true,
207                         DirectoryReplication: 1,
208                 },
209                 &UnixVolume{
210                         Root:                 "/mnt/network-disk",
211                         Serialize:            false,
212                         DirectoryReplication: 2,
213                 },
214         }
215 }
216
217 // Type implements Volume
218 func (v *UnixVolume) Type() string {
219         return "Directory"
220 }
221
222 // Start implements Volume
223 func (v *UnixVolume) Start(m *volumeMetrics) error {
224         if v.Serialize {
225                 v.locker = &sync.Mutex{}
226         }
227         if !strings.HasPrefix(v.Root, "/") {
228                 return fmt.Errorf("volume root does not start with '/': %q", v.Root)
229         }
230         if v.DirectoryReplication == 0 {
231                 v.DirectoryReplication = 1
232         }
233         _, err := v.os.Stat(v.Root)
234         if err == nil {
235                 // Set up prometheus metrics
236                 v.metrics = m
237                 v.os.stats.statsTicker.setup(m)
238                 // Periodically update free/used volume space
239                 go func() {
240                         for {
241                                 v.metrics.BytesFree.Set(float64(v.Status().BytesFree))
242                                 v.metrics.BytesUsed.Set(float64(v.Status().BytesUsed))
243                                 time.Sleep(10 * time.Second)
244                         }
245                 }()
246         }
247         return err
248 }
249
250 // Touch sets the timestamp for the given locator to the current time
251 func (v *UnixVolume) Touch(loc string) error {
252         v.metrics.Ops.Inc()
253         v.metrics.TouchOps.Inc()
254         if v.ReadOnly {
255                 return MethodDisabledError
256         }
257         p := v.blockPath(loc)
258         f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
259         if err != nil {
260                 return err
261         }
262         defer f.Close()
263         if err := v.lock(context.TODO()); err != nil {
264                 return err
265         }
266         defer v.unlock()
267         if e := v.lockfile(f); e != nil {
268                 return e
269         }
270         defer v.unlockfile(f)
271         ts := syscall.NsecToTimespec(time.Now().UnixNano())
272         v.os.stats.Tick(&v.os.stats.UtimesOps)
273         err = syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
274         v.os.stats.TickErr(err)
275         return err
276 }
277
278 // Mtime returns the stored timestamp for the given locator.
279 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
280         p := v.blockPath(loc)
281         fi, err := v.os.Stat(p)
282         if err != nil {
283                 return time.Time{}, err
284         }
285         return fi.ModTime(), nil
286 }
287
288 // Lock the locker (if one is in use), open the file for reading, and
289 // call the given function if and when the file is ready to read.
290 func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader) error) error {
291         if err := v.lock(ctx); err != nil {
292                 return err
293         }
294         defer v.unlock()
295         f, err := v.os.Open(path)
296         if err != nil {
297                 return err
298         }
299         defer f.Close()
300         return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
301 }
302
303 // stat is os.Stat() with some extra sanity checks.
304 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
305         stat, err := v.os.Stat(path)
306         if err == nil {
307                 if stat.Size() < 0 {
308                         err = os.ErrInvalid
309                 } else if stat.Size() > BlockSize {
310                         err = TooLongError
311                 }
312         }
313         return stat, err
314 }
315
316 // Get retrieves a block, copies it to the given slice, and returns
317 // the number of bytes copied.
318 func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
319         v.metrics.Ops.Inc()
320         v.metrics.GetOps.Inc()
321         return getWithPipe(ctx, loc, buf, v)
322 }
323
324 // ReadBlock implements BlockReader.
325 func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
326         path := v.blockPath(loc)
327         stat, err := v.stat(path)
328         if err != nil {
329                 return v.translateError(err)
330         }
331         return v.getFunc(ctx, path, func(rdr io.Reader) error {
332                 n, err := io.Copy(w, rdr)
333                 if err == nil && n != stat.Size() {
334                         err = io.ErrUnexpectedEOF
335                 }
336                 return err
337         })
338 }
339
340 // Compare returns nil if Get(loc) would return the same content as
341 // expect. It is functionally equivalent to Get() followed by
342 // bytes.Compare(), but uses less memory.
343 func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
344         v.metrics.Ops.Inc()
345         v.metrics.CompareOps.Inc()
346         path := v.blockPath(loc)
347         if _, err := v.stat(path); err != nil {
348                 return v.translateError(err)
349         }
350         return v.getFunc(ctx, path, func(rdr io.Reader) error {
351                 return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
352         })
353 }
354
355 // Put stores a block of data identified by the locator string
356 // "loc".  It returns nil on success.  If the volume is full, it
357 // returns a FullError.  If the write fails due to some other error,
358 // that error is returned.
359 func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
360         v.metrics.Ops.Inc()
361         v.metrics.PutOps.Inc()
362         return putWithPipe(ctx, loc, block, v)
363 }
364
365 // WriteBlock implements BlockWriter.
366 func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
367         if v.ReadOnly {
368                 return MethodDisabledError
369         }
370         if v.IsFull() {
371                 return FullError
372         }
373         bdir := v.blockDir(loc)
374         if err := os.MkdirAll(bdir, 0755); err != nil {
375                 log.Printf("%s: could not create directory %s: %s",
376                         loc, bdir, err)
377                 return err
378         }
379
380         tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
381         if tmperr != nil {
382                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
383                 return tmperr
384         }
385
386         bpath := v.blockPath(loc)
387
388         if err := v.lock(ctx); err != nil {
389                 return err
390         }
391         defer v.unlock()
392         n, err := io.Copy(tmpfile, rdr)
393         v.os.stats.TickOutBytes(uint64(n))
394         if err != nil {
395                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
396                 tmpfile.Close()
397                 v.os.Remove(tmpfile.Name())
398                 return err
399         }
400         if err := tmpfile.Close(); err != nil {
401                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
402                 v.os.Remove(tmpfile.Name())
403                 return err
404         }
405         if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
406                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
407                 return v.os.Remove(tmpfile.Name())
408         }
409         return nil
410 }
411
412 // Status returns a VolumeStatus struct describing the volume's
413 // current state, or nil if an error occurs.
414 //
415 func (v *UnixVolume) Status() *VolumeStatus {
416         fi, err := v.os.Stat(v.Root)
417         if err != nil {
418                 log.Printf("%s: os.Stat: %s\n", v, err)
419                 return nil
420         }
421         devnum := fi.Sys().(*syscall.Stat_t).Dev
422
423         var fs syscall.Statfs_t
424         if err := syscall.Statfs(v.Root, &fs); err != nil {
425                 log.Printf("%s: statfs: %s\n", v, err)
426                 return nil
427         }
428         // These calculations match the way df calculates disk usage:
429         // "free" space is measured by fs.Bavail, but "used" space
430         // uses fs.Blocks - fs.Bfree.
431         free := fs.Bavail * uint64(fs.Bsize)
432         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
433         return &VolumeStatus{
434                 MountPoint: v.Root,
435                 DeviceNum:  devnum,
436                 BytesFree:  free,
437                 BytesUsed:  used,
438         }
439 }
440
441 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
442 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
443
444 // IndexTo writes (to the given Writer) a list of blocks found on this
445 // volume which begin with the specified prefix. If the prefix is an
446 // empty string, IndexTo writes a complete list of blocks.
447 //
448 // Each block is given in the format
449 //
450 //     locator+size modification-time {newline}
451 //
452 // e.g.:
453 //
454 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
455 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
456 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
457 //
458 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
459         var lastErr error
460         rootdir, err := v.os.Open(v.Root)
461         if err != nil {
462                 return err
463         }
464         defer rootdir.Close()
465         v.os.stats.Tick(&v.os.stats.ReaddirOps)
466         for {
467                 names, err := rootdir.Readdirnames(1)
468                 if err == io.EOF {
469                         return lastErr
470                 } else if err != nil {
471                         return err
472                 }
473                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
474                         // prefix excludes all blocks stored in this dir
475                         continue
476                 }
477                 if !blockDirRe.MatchString(names[0]) {
478                         continue
479                 }
480                 blockdirpath := filepath.Join(v.Root, names[0])
481                 blockdir, err := v.os.Open(blockdirpath)
482                 if err != nil {
483                         log.Print("Error reading ", blockdirpath, ": ", err)
484                         lastErr = err
485                         continue
486                 }
487                 v.os.stats.Tick(&v.os.stats.ReaddirOps)
488                 for {
489                         fileInfo, err := blockdir.Readdir(1)
490                         if err == io.EOF {
491                                 break
492                         } else if err != nil {
493                                 log.Print("Error reading ", blockdirpath, ": ", err)
494                                 lastErr = err
495                                 break
496                         }
497                         name := fileInfo[0].Name()
498                         if !strings.HasPrefix(name, prefix) {
499                                 continue
500                         }
501                         if !blockFileRe.MatchString(name) {
502                                 continue
503                         }
504                         _, err = fmt.Fprint(w,
505                                 name,
506                                 "+", fileInfo[0].Size(),
507                                 " ", fileInfo[0].ModTime().UnixNano(),
508                                 "\n")
509                         if err != nil {
510                                 log.Print("Error writing : ", err)
511                                 lastErr = err
512                                 break
513                         }
514                 }
515                 blockdir.Close()
516         }
517 }
518
519 // Trash trashes the block data from the unix storage
520 // If TrashLifetime == 0, the block is deleted
521 // Else, the block is renamed as path/{loc}.trash.{deadline},
522 // where deadline = now + TrashLifetime
523 func (v *UnixVolume) Trash(loc string) error {
524         // Touch() must be called before calling Write() on a block.  Touch()
525         // also uses lockfile().  This avoids a race condition between Write()
526         // and Trash() because either (a) the file will be trashed and Touch()
527         // will signal to the caller that the file is not present (and needs to
528         // be re-written), or (b) Touch() will update the file's timestamp and
529         // Trash() will read the correct up-to-date timestamp and choose not to
530         // trash the file.
531
532         if v.ReadOnly {
533                 return MethodDisabledError
534         }
535         if err := v.lock(context.TODO()); err != nil {
536                 return err
537         }
538         defer v.unlock()
539         p := v.blockPath(loc)
540         f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
541         if err != nil {
542                 return err
543         }
544         defer f.Close()
545         if e := v.lockfile(f); e != nil {
546                 return e
547         }
548         defer v.unlockfile(f)
549
550         // If the block has been PUT in the last blobSignatureTTL
551         // seconds, return success without removing the block. This
552         // protects data from garbage collection until it is no longer
553         // possible for clients to retrieve the unreferenced blocks
554         // anyway (because the permission signatures have expired).
555         if fi, err := v.os.Stat(p); err != nil {
556                 return err
557         } else if time.Since(fi.ModTime()) < time.Duration(theConfig.BlobSignatureTTL) {
558                 return nil
559         }
560
561         if theConfig.TrashLifetime == 0 {
562                 return v.os.Remove(p)
563         }
564         return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()))
565 }
566
567 // Untrash moves block from trash back into store
568 // Look for path/{loc}.trash.{deadline} in storage,
569 // and rename the first such file as path/{loc}
570 func (v *UnixVolume) Untrash(loc string) (err error) {
571         if v.ReadOnly {
572                 return MethodDisabledError
573         }
574
575         v.os.stats.Tick(&v.os.stats.ReaddirOps)
576         files, err := ioutil.ReadDir(v.blockDir(loc))
577         if err != nil {
578                 return err
579         }
580
581         if len(files) == 0 {
582                 return os.ErrNotExist
583         }
584
585         foundTrash := false
586         prefix := fmt.Sprintf("%v.trash.", loc)
587         for _, f := range files {
588                 if strings.HasPrefix(f.Name(), prefix) {
589                         foundTrash = true
590                         err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
591                         if err == nil {
592                                 break
593                         }
594                 }
595         }
596
597         if foundTrash == false {
598                 return os.ErrNotExist
599         }
600
601         return
602 }
603
604 // blockDir returns the fully qualified directory name for the directory
605 // where loc is (or would be) stored on this volume.
606 func (v *UnixVolume) blockDir(loc string) string {
607         return filepath.Join(v.Root, loc[0:3])
608 }
609
610 // blockPath returns the fully qualified pathname for the path to loc
611 // on this volume.
612 func (v *UnixVolume) blockPath(loc string) string {
613         return filepath.Join(v.blockDir(loc), loc)
614 }
615
616 // IsFull returns true if the free space on the volume is less than
617 // MinFreeKilobytes.
618 //
619 func (v *UnixVolume) IsFull() (isFull bool) {
620         fullSymlink := v.Root + "/full"
621
622         // Check if the volume has been marked as full in the last hour.
623         if link, err := os.Readlink(fullSymlink); err == nil {
624                 if ts, err := strconv.Atoi(link); err == nil {
625                         fulltime := time.Unix(int64(ts), 0)
626                         if time.Since(fulltime).Hours() < 1.0 {
627                                 return true
628                         }
629                 }
630         }
631
632         if avail, err := v.FreeDiskSpace(); err == nil {
633                 isFull = avail < MinFreeKilobytes
634         } else {
635                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
636                 isFull = false
637         }
638
639         // If the volume is full, timestamp it.
640         if isFull {
641                 now := fmt.Sprintf("%d", time.Now().Unix())
642                 os.Symlink(now, fullSymlink)
643         }
644         return
645 }
646
647 // FreeDiskSpace returns the number of unused 1k blocks available on
648 // the volume.
649 //
650 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
651         var fs syscall.Statfs_t
652         err = syscall.Statfs(v.Root, &fs)
653         if err == nil {
654                 // Statfs output is not guaranteed to measure free
655                 // space in terms of 1K blocks.
656                 free = fs.Bavail * uint64(fs.Bsize) / 1024
657         }
658         return
659 }
660
661 func (v *UnixVolume) String() string {
662         return fmt.Sprintf("[UnixVolume %s]", v.Root)
663 }
664
665 // Writable returns false if all future Put, Mtime, and Delete calls
666 // are expected to fail.
667 func (v *UnixVolume) Writable() bool {
668         return !v.ReadOnly
669 }
670
671 // Replication returns the number of replicas promised by the
672 // underlying device (as specified in configuration).
673 func (v *UnixVolume) Replication() int {
674         return v.DirectoryReplication
675 }
676
677 // GetStorageClasses implements Volume
678 func (v *UnixVolume) GetStorageClasses() []string {
679         return v.StorageClasses
680 }
681
682 // InternalStats returns I/O and filesystem ops counters.
683 func (v *UnixVolume) InternalStats() interface{} {
684         return &v.os.stats
685 }
686
687 // lock acquires the serialize lock, if one is in use. If ctx is done
688 // before the lock is acquired, lock returns ctx.Err() instead of
689 // acquiring the lock.
690 func (v *UnixVolume) lock(ctx context.Context) error {
691         if v.locker == nil {
692                 return nil
693         }
694         locked := make(chan struct{})
695         go func() {
696                 v.locker.Lock()
697                 close(locked)
698         }()
699         select {
700         case <-ctx.Done():
701                 go func() {
702                         <-locked
703                         v.locker.Unlock()
704                 }()
705                 return ctx.Err()
706         case <-locked:
707                 return nil
708         }
709 }
710
711 // unlock releases the serialize lock, if one is in use.
712 func (v *UnixVolume) unlock() {
713         if v.locker == nil {
714                 return
715         }
716         v.locker.Unlock()
717 }
718
719 // lockfile and unlockfile use flock(2) to manage kernel file locks.
720 func (v *UnixVolume) lockfile(f *os.File) error {
721         v.os.stats.Tick(&v.os.stats.FlockOps)
722         err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
723         v.os.stats.TickErr(err)
724         return err
725 }
726
727 func (v *UnixVolume) unlockfile(f *os.File) error {
728         err := syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
729         v.os.stats.TickErr(err)
730         return err
731 }
732
733 // Where appropriate, translate a more specific filesystem error to an
734 // error recognized by handlers, like os.ErrNotExist.
735 func (v *UnixVolume) translateError(err error) error {
736         switch err.(type) {
737         case *os.PathError:
738                 // stat() returns a PathError if the parent directory
739                 // (not just the file itself) is missing
740                 return os.ErrNotExist
741         default:
742                 return err
743         }
744 }
745
746 var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
747
748 // EmptyTrash walks hierarchy looking for {hash}.trash.*
749 // and deletes those with deadline < now.
750 func (v *UnixVolume) EmptyTrash() {
751         var bytesDeleted, bytesInTrash int64
752         var blocksDeleted, blocksInTrash int64
753
754         doFile := func(path string, info os.FileInfo) {
755                 if info.Mode().IsDir() {
756                         return
757                 }
758                 matches := unixTrashLocRegexp.FindStringSubmatch(path)
759                 if len(matches) != 3 {
760                         return
761                 }
762                 deadline, err := strconv.ParseInt(matches[2], 10, 64)
763                 if err != nil {
764                         log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
765                         return
766                 }
767                 atomic.AddInt64(&bytesInTrash, info.Size())
768                 atomic.AddInt64(&blocksInTrash, 1)
769                 if deadline > time.Now().Unix() {
770                         return
771                 }
772                 err = v.os.Remove(path)
773                 if err != nil {
774                         log.Printf("EmptyTrash: Remove %v: %v", path, err)
775                         return
776                 }
777                 atomic.AddInt64(&bytesDeleted, info.Size())
778                 atomic.AddInt64(&blocksDeleted, 1)
779         }
780
781         type dirent struct {
782                 path string
783                 info os.FileInfo
784         }
785         var wg sync.WaitGroup
786         todo := make(chan dirent, theConfig.EmptyTrashWorkers)
787         for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ {
788                 wg.Add(1)
789                 go func() {
790                         defer wg.Done()
791                         for e := range todo {
792                                 doFile(e.path, e.info)
793                         }
794                 }()
795         }
796
797         err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
798                 if err != nil {
799                         log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
800                         return nil
801                 }
802                 todo <- dirent{path, info}
803                 return nil
804         })
805         close(todo)
806         wg.Wait()
807
808         if err != nil {
809                 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
810         }
811
812         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
813 }
814
815 type unixStats struct {
816         statsTicker
817         OpenOps    uint64
818         StatOps    uint64
819         FlockOps   uint64
820         UtimesOps  uint64
821         CreateOps  uint64
822         RenameOps  uint64
823         UnlinkOps  uint64
824         ReaddirOps uint64
825 }
826
827 func (s *unixStats) TickErr(err error) {
828         if err == nil {
829                 return
830         }
831         s.statsTicker.TickErr(err, fmt.Sprintf("%T", err))
832 }
833
834 type osWithStats struct {
835         stats unixStats
836 }
837
838 func (o *osWithStats) Open(name string) (*os.File, error) {
839         o.stats.Tick(&o.stats.OpenOps)
840         f, err := os.Open(name)
841         o.stats.TickErr(err)
842         return f, err
843 }
844
845 func (o *osWithStats) OpenFile(name string, flag int, perm os.FileMode) (*os.File, error) {
846         o.stats.Tick(&o.stats.OpenOps)
847         f, err := os.OpenFile(name, flag, perm)
848         o.stats.TickErr(err)
849         return f, err
850 }
851
852 func (o *osWithStats) Remove(path string) error {
853         o.stats.Tick(&o.stats.UnlinkOps)
854         err := os.Remove(path)
855         o.stats.TickErr(err)
856         return err
857 }
858
859 func (o *osWithStats) Rename(a, b string) error {
860         o.stats.Tick(&o.stats.RenameOps)
861         err := os.Rename(a, b)
862         o.stats.TickErr(err)
863         return err
864 }
865
866 func (o *osWithStats) Stat(path string) (os.FileInfo, error) {
867         o.stats.Tick(&o.stats.StatOps)
868         fi, err := os.Stat(path)
869         o.stats.TickErr(err)
870         return fi, err
871 }
872
873 func (o *osWithStats) TempFile(dir, base string) (*os.File, error) {
874         o.stats.Tick(&o.stats.CreateOps)
875         f, err := ioutil.TempFile(dir, base)
876         o.stats.TickErr(err)
877         return f, err
878 }