7393: Add S3 volume type.
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if dirs := strings.Split(value, ","); len(dirs) > 1 {
27                 log.Print("DEPRECATED: using comma-separated volume list.")
28                 for _, dir := range dirs {
29                         if err := vs.Set(dir); err != nil {
30                                 return err
31                         }
32                 }
33                 return nil
34         }
35         if len(value) == 0 || value[0] != '/' {
36                 return errors.New("Invalid volume: must begin with '/'.")
37         }
38         if _, err := os.Stat(value); err != nil {
39                 return err
40         }
41         var locker sync.Locker
42         if flagSerializeIO {
43                 locker = &sync.Mutex{}
44         }
45         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
46                 root:     value,
47                 locker:   locker,
48                 readonly: flagReadonly,
49         })
50         return nil
51 }
52
53 func init() {
54         flag.Var(
55                 &unixVolumeAdder{&volumes},
56                 "volumes",
57                 "Deprecated synonym for -volume.")
58         flag.Var(
59                 &unixVolumeAdder{&volumes},
60                 "volume",
61                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
62 }
63
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
68         added := 0
69         f, err := os.Open(ProcMounts)
70         if err != nil {
71                 log.Fatalf("opening %s: %s", ProcMounts, err)
72         }
73         scanner := bufio.NewScanner(f)
74         for scanner.Scan() {
75                 args := strings.Fields(scanner.Text())
76                 if err := scanner.Err(); err != nil {
77                         log.Fatalf("reading %s: %s", ProcMounts, err)
78                 }
79                 dev, mount := args[0], args[1]
80                 if mount == "/" {
81                         continue
82                 }
83                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
84                         continue
85                 }
86                 keepdir := mount + "/keep"
87                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
88                         continue
89                 }
90                 // Set the -readonly flag (but only for this volume)
91                 // if the filesystem is mounted readonly.
92                 flagReadonlyWas := flagReadonly
93                 for _, fsopt := range strings.Split(args[3], ",") {
94                         if fsopt == "ro" {
95                                 flagReadonly = true
96                                 break
97                         }
98                         if fsopt == "rw" {
99                                 break
100                         }
101                 }
102                 if err := vs.Set(keepdir); err != nil {
103                         log.Printf("adding %q: %s", keepdir, err)
104                 } else {
105                         added++
106                 }
107                 flagReadonly = flagReadonlyWas
108         }
109         return added
110 }
111
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114         // path to the volume's root directory
115         root string
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker   sync.Locker
119         readonly bool
120 }
121
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
124         if v.readonly {
125                 return MethodDisabledError
126         }
127         p := v.blockPath(loc)
128         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
129         if err != nil {
130                 return err
131         }
132         defer f.Close()
133         if v.locker != nil {
134                 v.locker.Lock()
135                 defer v.locker.Unlock()
136         }
137         if e := lockfile(f); e != nil {
138                 return e
139         }
140         defer unlockfile(f)
141         now := time.Now().Unix()
142         utime := syscall.Utimbuf{now, now}
143         return syscall.Utime(p, &utime)
144 }
145
146 // Mtime returns the stored timestamp for the given locator.
147 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
148         p := v.blockPath(loc)
149         fi, err := os.Stat(p)
150         if err != nil {
151                 return time.Time{}, err
152         }
153         return fi.ModTime(), nil
154 }
155
156 // Lock the locker (if one is in use), open the file for reading, and
157 // call the given function if and when the file is ready to read.
158 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
159         if v.locker != nil {
160                 v.locker.Lock()
161                 defer v.locker.Unlock()
162         }
163         f, err := os.Open(path)
164         if err != nil {
165                 return err
166         }
167         defer f.Close()
168         return fn(f)
169 }
170
171 // stat is os.Stat() with some extra sanity checks.
172 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
173         stat, err := os.Stat(path)
174         if err == nil {
175                 if stat.Size() < 0 {
176                         err = os.ErrInvalid
177                 } else if stat.Size() > BlockSize {
178                         err = TooLongError
179                 }
180         }
181         return stat, err
182 }
183
184 // Get retrieves a block identified by the locator string "loc", and
185 // returns its contents as a byte slice.
186 //
187 // Get returns a nil buffer IFF it returns a non-nil error.
188 func (v *UnixVolume) Get(loc string) ([]byte, error) {
189         path := v.blockPath(loc)
190         stat, err := v.stat(path)
191         if err != nil {
192                 return nil, v.translateError(err)
193         }
194         buf := bufs.Get(int(stat.Size()))
195         err = v.getFunc(path, func(rdr io.Reader) error {
196                 _, err = io.ReadFull(rdr, buf)
197                 return err
198         })
199         if err != nil {
200                 bufs.Put(buf)
201                 return nil, err
202         }
203         return buf, nil
204 }
205
206 // Compare returns nil if Get(loc) would return the same content as
207 // expect. It is functionally equivalent to Get() followed by
208 // bytes.Compare(), but uses less memory.
209 func (v *UnixVolume) Compare(loc string, expect []byte) error {
210         path := v.blockPath(loc)
211         if _, err := v.stat(path); err != nil {
212                 return v.translateError(err)
213         }
214         return v.getFunc(path, func(rdr io.Reader) error {
215                 return compareReaderWithBuf(rdr, expect, loc[:32])
216         })
217 }
218
219 // Put stores a block of data identified by the locator string
220 // "loc".  It returns nil on success.  If the volume is full, it
221 // returns a FullError.  If the write fails due to some other error,
222 // that error is returned.
223 func (v *UnixVolume) Put(loc string, block []byte) error {
224         if v.readonly {
225                 return MethodDisabledError
226         }
227         if v.IsFull() {
228                 return FullError
229         }
230         bdir := v.blockDir(loc)
231         if err := os.MkdirAll(bdir, 0755); err != nil {
232                 log.Printf("%s: could not create directory %s: %s",
233                         loc, bdir, err)
234                 return err
235         }
236
237         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
238         if tmperr != nil {
239                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
240                 return tmperr
241         }
242         bpath := v.blockPath(loc)
243
244         if v.locker != nil {
245                 v.locker.Lock()
246                 defer v.locker.Unlock()
247         }
248         if _, err := tmpfile.Write(block); err != nil {
249                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
250                 tmpfile.Close()
251                 os.Remove(tmpfile.Name())
252                 return err
253         }
254         if err := tmpfile.Close(); err != nil {
255                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
256                 os.Remove(tmpfile.Name())
257                 return err
258         }
259         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
260                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
261                 os.Remove(tmpfile.Name())
262                 return err
263         }
264         return nil
265 }
266
267 // Status returns a VolumeStatus struct describing the volume's
268 // current state, or nil if an error occurs.
269 //
270 func (v *UnixVolume) Status() *VolumeStatus {
271         var fs syscall.Statfs_t
272         var devnum uint64
273
274         if fi, err := os.Stat(v.root); err == nil {
275                 devnum = fi.Sys().(*syscall.Stat_t).Dev
276         } else {
277                 log.Printf("%s: os.Stat: %s\n", v, err)
278                 return nil
279         }
280
281         err := syscall.Statfs(v.root, &fs)
282         if err != nil {
283                 log.Printf("%s: statfs: %s\n", v, err)
284                 return nil
285         }
286         // These calculations match the way df calculates disk usage:
287         // "free" space is measured by fs.Bavail, but "used" space
288         // uses fs.Blocks - fs.Bfree.
289         free := fs.Bavail * uint64(fs.Bsize)
290         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
291         return &VolumeStatus{v.root, devnum, free, used}
292 }
293
294 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
295 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
296
297 // IndexTo writes (to the given Writer) a list of blocks found on this
298 // volume which begin with the specified prefix. If the prefix is an
299 // empty string, IndexTo writes a complete list of blocks.
300 //
301 // Each block is given in the format
302 //
303 //     locator+size modification-time {newline}
304 //
305 // e.g.:
306 //
307 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
308 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
309 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
310 //
311 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
312         var lastErr error = nil
313         rootdir, err := os.Open(v.root)
314         if err != nil {
315                 return err
316         }
317         defer rootdir.Close()
318         for {
319                 names, err := rootdir.Readdirnames(1)
320                 if err == io.EOF {
321                         return lastErr
322                 } else if err != nil {
323                         return err
324                 }
325                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
326                         // prefix excludes all blocks stored in this dir
327                         continue
328                 }
329                 if !blockDirRe.MatchString(names[0]) {
330                         continue
331                 }
332                 blockdirpath := filepath.Join(v.root, names[0])
333                 blockdir, err := os.Open(blockdirpath)
334                 if err != nil {
335                         log.Print("Error reading ", blockdirpath, ": ", err)
336                         lastErr = err
337                         continue
338                 }
339                 for {
340                         fileInfo, err := blockdir.Readdir(1)
341                         if err == io.EOF {
342                                 break
343                         } else if err != nil {
344                                 log.Print("Error reading ", blockdirpath, ": ", err)
345                                 lastErr = err
346                                 break
347                         }
348                         name := fileInfo[0].Name()
349                         if !strings.HasPrefix(name, prefix) {
350                                 continue
351                         }
352                         if !blockFileRe.MatchString(name) {
353                                 continue
354                         }
355                         _, err = fmt.Fprint(w,
356                                 name,
357                                 "+", fileInfo[0].Size(),
358                                 " ", fileInfo[0].ModTime().Unix(),
359                                 "\n")
360                 }
361                 blockdir.Close()
362         }
363 }
364
365 // Delete deletes the block data from the unix storage
366 func (v *UnixVolume) Delete(loc string) error {
367         // Touch() must be called before calling Write() on a block.  Touch()
368         // also uses lockfile().  This avoids a race condition between Write()
369         // and Delete() because either (a) the file will be deleted and Touch()
370         // will signal to the caller that the file is not present (and needs to
371         // be re-written), or (b) Touch() will update the file's timestamp and
372         // Delete() will read the correct up-to-date timestamp and choose not to
373         // delete the file.
374
375         if v.readonly {
376                 return MethodDisabledError
377         }
378         if v.locker != nil {
379                 v.locker.Lock()
380                 defer v.locker.Unlock()
381         }
382         p := v.blockPath(loc)
383         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
384         if err != nil {
385                 return err
386         }
387         defer f.Close()
388         if e := lockfile(f); e != nil {
389                 return e
390         }
391         defer unlockfile(f)
392
393         // If the block has been PUT in the last blobSignatureTTL
394         // seconds, return success without removing the block. This
395         // protects data from garbage collection until it is no longer
396         // possible for clients to retrieve the unreferenced blocks
397         // anyway (because the permission signatures have expired).
398         if fi, err := os.Stat(p); err != nil {
399                 return err
400         } else {
401                 if time.Since(fi.ModTime()) < blobSignatureTTL {
402                         return nil
403                 }
404         }
405         return os.Remove(p)
406 }
407
408 // blockDir returns the fully qualified directory name for the directory
409 // where loc is (or would be) stored on this volume.
410 func (v *UnixVolume) blockDir(loc string) string {
411         return filepath.Join(v.root, loc[0:3])
412 }
413
414 // blockPath returns the fully qualified pathname for the path to loc
415 // on this volume.
416 func (v *UnixVolume) blockPath(loc string) string {
417         return filepath.Join(v.blockDir(loc), loc)
418 }
419
420 // IsFull returns true if the free space on the volume is less than
421 // MinFreeKilobytes.
422 //
423 func (v *UnixVolume) IsFull() (isFull bool) {
424         fullSymlink := v.root + "/full"
425
426         // Check if the volume has been marked as full in the last hour.
427         if link, err := os.Readlink(fullSymlink); err == nil {
428                 if ts, err := strconv.Atoi(link); err == nil {
429                         fulltime := time.Unix(int64(ts), 0)
430                         if time.Since(fulltime).Hours() < 1.0 {
431                                 return true
432                         }
433                 }
434         }
435
436         if avail, err := v.FreeDiskSpace(); err == nil {
437                 isFull = avail < MinFreeKilobytes
438         } else {
439                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
440                 isFull = false
441         }
442
443         // If the volume is full, timestamp it.
444         if isFull {
445                 now := fmt.Sprintf("%d", time.Now().Unix())
446                 os.Symlink(now, fullSymlink)
447         }
448         return
449 }
450
451 // FreeDiskSpace returns the number of unused 1k blocks available on
452 // the volume.
453 //
454 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
455         var fs syscall.Statfs_t
456         err = syscall.Statfs(v.root, &fs)
457         if err == nil {
458                 // Statfs output is not guaranteed to measure free
459                 // space in terms of 1K blocks.
460                 free = fs.Bavail * uint64(fs.Bsize) / 1024
461         }
462         return
463 }
464
465 func (v *UnixVolume) String() string {
466         return fmt.Sprintf("[UnixVolume %s]", v.root)
467 }
468
469 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
470 func (v *UnixVolume) Writable() bool {
471         return !v.readonly
472 }
473
474 func (v *UnixVolume) Replication() int {
475         return 1
476 }
477
478 // lockfile and unlockfile use flock(2) to manage kernel file locks.
479 func lockfile(f *os.File) error {
480         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
481 }
482
483 func unlockfile(f *os.File) error {
484         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
485 }
486
487 // Where appropriate, translate a more specific filesystem error to an
488 // error recognized by handlers, like os.ErrNotExist.
489 func (v *UnixVolume) translateError(err error) error {
490         switch err.(type) {
491         case *os.PathError:
492                 // stat() returns a PathError if the parent directory
493                 // (not just the file itself) is missing
494                 return os.ErrNotExist
495         default:
496                 return err
497         }
498 }