7241: Stub Azure API calls
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bufio"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "path/filepath"
13         "regexp"
14         "strconv"
15         "strings"
16         "sync"
17         "syscall"
18         "time"
19 )
20
21 type unixVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (vs *unixVolumeAdder) Set(value string) error {
26         if dirs := strings.Split(value, ","); len(dirs) > 1 {
27                 log.Print("DEPRECATED: using comma-separated volume list.")
28                 for _, dir := range dirs {
29                         if err := vs.Set(dir); err != nil {
30                                 return err
31                         }
32                 }
33                 return nil
34         }
35         if len(value) == 0 || value[0] != '/' {
36                 return errors.New("Invalid volume: must begin with '/'.")
37         }
38         if _, err := os.Stat(value); err != nil {
39                 return err
40         }
41         var locker sync.Locker
42         if flagSerializeIO {
43                 locker = &sync.Mutex{}
44         }
45         *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
46                 root:     value,
47                 locker:   locker,
48                 readonly: flagReadonly,
49         })
50         return nil
51 }
52
53 func init() {
54         flag.Var(
55                 &unixVolumeAdder{&volumes},
56                 "volumes",
57                 "Deprecated synonym for -volume.")
58         flag.Var(
59                 &unixVolumeAdder{&volumes},
60                 "volume",
61                 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
62 }
63
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
68         added := 0
69         f, err := os.Open(ProcMounts)
70         if err != nil {
71                 log.Fatalf("opening %s: %s", ProcMounts, err)
72         }
73         scanner := bufio.NewScanner(f)
74         for scanner.Scan() {
75                 args := strings.Fields(scanner.Text())
76                 if err := scanner.Err(); err != nil {
77                         log.Fatalf("reading %s: %s", ProcMounts, err)
78                 }
79                 dev, mount := args[0], args[1]
80                 if mount == "/" {
81                         continue
82                 }
83                 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
84                         continue
85                 }
86                 keepdir := mount + "/keep"
87                 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
88                         continue
89                 }
90                 // Set the -readonly flag (but only for this volume)
91                 // if the filesystem is mounted readonly.
92                 flagReadonlyWas := flagReadonly
93                 for _, fsopt := range strings.Split(args[3], ",") {
94                         if fsopt == "ro" {
95                                 flagReadonly = true
96                                 break
97                         }
98                         if fsopt == "rw" {
99                                 break
100                         }
101                 }
102                 if err := vs.Set(keepdir); err != nil {
103                         log.Printf("adding %q: %s", keepdir, err)
104                 } else {
105                         added++
106                 }
107                 flagReadonly = flagReadonlyWas
108         }
109         return added
110 }
111
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114         // path to the volume's root directory
115         root string
116         // something to lock during IO, typically a sync.Mutex (or nil
117         // to skip locking)
118         locker   sync.Locker
119         readonly bool
120 }
121
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
124         if v.readonly {
125                 return MethodDisabledError
126         }
127         p := v.blockPath(loc)
128         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
129         if err != nil {
130                 return err
131         }
132         defer f.Close()
133         if v.locker != nil {
134                 v.locker.Lock()
135                 defer v.locker.Unlock()
136         }
137         if e := lockfile(f); e != nil {
138                 return e
139         }
140         defer unlockfile(f)
141         now := time.Now().Unix()
142         utime := syscall.Utimbuf{now, now}
143         return syscall.Utime(p, &utime)
144 }
145
146 // Mtime returns the stored timestamp for the given locator.
147 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
148         p := v.blockPath(loc)
149         fi, err := os.Stat(p)
150         if err != nil {
151                 return time.Time{}, err
152         }
153         return fi.ModTime(), nil
154 }
155
156 // Lock the locker (if one is in use), open the file for reading, and
157 // call the given function if and when the file is ready to read.
158 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
159         if v.locker != nil {
160                 v.locker.Lock()
161                 defer v.locker.Unlock()
162         }
163         f, err := os.Open(path)
164         if err != nil {
165                 return err
166         }
167         defer f.Close()
168         return fn(f)
169 }
170
171 // stat is os.Stat() with some extra sanity checks.
172 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
173         stat, err := os.Stat(path)
174         if err == nil {
175                 if stat.Size() < 0 {
176                         err = os.ErrInvalid
177                 } else if stat.Size() > BlockSize {
178                         err = TooLongError
179                 }
180         }
181         return stat, err
182 }
183
184 // Get retrieves a block identified by the locator string "loc", and
185 // returns its contents as a byte slice.
186 //
187 // Get returns a nil buffer IFF it returns a non-nil error.
188 func (v *UnixVolume) Get(loc string) ([]byte, error) {
189         path := v.blockPath(loc)
190         stat, err := v.stat(path)
191         if err != nil {
192                 return nil, err
193         }
194         buf := bufs.Get(int(stat.Size()))
195         err = v.getFunc(path, func(rdr io.Reader) error {
196                 _, err = io.ReadFull(rdr, buf)
197                 return err
198         })
199         if err != nil {
200                 bufs.Put(buf)
201                 return nil, err
202         }
203         return buf, nil
204 }
205
206 // Compare returns nil if Get(loc) would return the same content as
207 // expect. It is functionally equivalent to Get() followed by
208 // bytes.Compare(), but uses less memory.
209 func (v *UnixVolume) Compare(loc string, expect []byte) error {
210         path := v.blockPath(loc)
211         if _, err := v.stat(path); err != nil {
212                 return err
213         }
214         return v.getFunc(path, func(rdr io.Reader) error {
215                 return compareReaderWithBuf(rdr, expect, loc[:32])
216         })
217 }
218
219 // Put stores a block of data identified by the locator string
220 // "loc".  It returns nil on success.  If the volume is full, it
221 // returns a FullError.  If the write fails due to some other error,
222 // that error is returned.
223 func (v *UnixVolume) Put(loc string, block []byte) error {
224         if v.readonly {
225                 return MethodDisabledError
226         }
227         if v.IsFull() {
228                 return FullError
229         }
230         bdir := v.blockDir(loc)
231         if err := os.MkdirAll(bdir, 0755); err != nil {
232                 log.Printf("%s: could not create directory %s: %s",
233                         loc, bdir, err)
234                 return err
235         }
236
237         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
238         if tmperr != nil {
239                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
240                 return tmperr
241         }
242         bpath := v.blockPath(loc)
243
244         if v.locker != nil {
245                 v.locker.Lock()
246                 defer v.locker.Unlock()
247         }
248         if _, err := tmpfile.Write(block); err != nil {
249                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
250                 tmpfile.Close()
251                 os.Remove(tmpfile.Name())
252                 return err
253         }
254         if err := tmpfile.Close(); err != nil {
255                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
256                 os.Remove(tmpfile.Name())
257                 return err
258         }
259         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
260                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
261                 os.Remove(tmpfile.Name())
262                 return err
263         }
264         return nil
265 }
266
267 // Status returns a VolumeStatus struct describing the volume's
268 // current state, or nil if an error occurs.
269 //
270 func (v *UnixVolume) Status() *VolumeStatus {
271         var fs syscall.Statfs_t
272         var devnum uint64
273
274         if fi, err := os.Stat(v.root); err == nil {
275                 devnum = fi.Sys().(*syscall.Stat_t).Dev
276         } else {
277                 log.Printf("%s: os.Stat: %s\n", v, err)
278                 return nil
279         }
280
281         err := syscall.Statfs(v.root, &fs)
282         if err != nil {
283                 log.Printf("%s: statfs: %s\n", v, err)
284                 return nil
285         }
286         // These calculations match the way df calculates disk usage:
287         // "free" space is measured by fs.Bavail, but "used" space
288         // uses fs.Blocks - fs.Bfree.
289         free := fs.Bavail * uint64(fs.Bsize)
290         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
291         return &VolumeStatus{v.root, devnum, free, used}
292 }
293
294 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
295
296 // IndexTo writes (to the given Writer) a list of blocks found on this
297 // volume which begin with the specified prefix. If the prefix is an
298 // empty string, IndexTo writes a complete list of blocks.
299 //
300 // Each block is given in the format
301 //
302 //     locator+size modification-time {newline}
303 //
304 // e.g.:
305 //
306 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
307 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
308 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
309 //
310 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
311         var lastErr error = nil
312         rootdir, err := os.Open(v.root)
313         if err != nil {
314                 return err
315         }
316         defer rootdir.Close()
317         for {
318                 names, err := rootdir.Readdirnames(1)
319                 if err == io.EOF {
320                         return lastErr
321                 } else if err != nil {
322                         return err
323                 }
324                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
325                         // prefix excludes all blocks stored in this dir
326                         continue
327                 }
328                 if !blockDirRe.MatchString(names[0]) {
329                         continue
330                 }
331                 blockdirpath := filepath.Join(v.root, names[0])
332                 blockdir, err := os.Open(blockdirpath)
333                 if err != nil {
334                         log.Print("Error reading ", blockdirpath, ": ", err)
335                         lastErr = err
336                         continue
337                 }
338                 for {
339                         fileInfo, err := blockdir.Readdir(1)
340                         if err == io.EOF {
341                                 break
342                         } else if err != nil {
343                                 log.Print("Error reading ", blockdirpath, ": ", err)
344                                 lastErr = err
345                                 break
346                         }
347                         name := fileInfo[0].Name()
348                         if !strings.HasPrefix(name, prefix) {
349                                 continue
350                         }
351                         _, err = fmt.Fprint(w,
352                                 name,
353                                 "+", fileInfo[0].Size(),
354                                 " ", fileInfo[0].ModTime().Unix(),
355                                 "\n")
356                 }
357                 blockdir.Close()
358         }
359 }
360
361 // Delete deletes the block data from the unix storage
362 func (v *UnixVolume) Delete(loc string) error {
363         // Touch() must be called before calling Write() on a block.  Touch()
364         // also uses lockfile().  This avoids a race condition between Write()
365         // and Delete() because either (a) the file will be deleted and Touch()
366         // will signal to the caller that the file is not present (and needs to
367         // be re-written), or (b) Touch() will update the file's timestamp and
368         // Delete() will read the correct up-to-date timestamp and choose not to
369         // delete the file.
370
371         if v.readonly {
372                 return MethodDisabledError
373         }
374         if v.locker != nil {
375                 v.locker.Lock()
376                 defer v.locker.Unlock()
377         }
378         p := v.blockPath(loc)
379         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
380         if err != nil {
381                 return err
382         }
383         defer f.Close()
384         if e := lockfile(f); e != nil {
385                 return e
386         }
387         defer unlockfile(f)
388
389         // If the block has been PUT in the last blobSignatureTTL
390         // seconds, return success without removing the block. This
391         // protects data from garbage collection until it is no longer
392         // possible for clients to retrieve the unreferenced blocks
393         // anyway (because the permission signatures have expired).
394         if fi, err := os.Stat(p); err != nil {
395                 return err
396         } else {
397                 if time.Since(fi.ModTime()) < blobSignatureTTL {
398                         return nil
399                 }
400         }
401         return os.Remove(p)
402 }
403
404 // blockDir returns the fully qualified directory name for the directory
405 // where loc is (or would be) stored on this volume.
406 func (v *UnixVolume) blockDir(loc string) string {
407         return filepath.Join(v.root, loc[0:3])
408 }
409
410 // blockPath returns the fully qualified pathname for the path to loc
411 // on this volume.
412 func (v *UnixVolume) blockPath(loc string) string {
413         return filepath.Join(v.blockDir(loc), loc)
414 }
415
416 // IsFull returns true if the free space on the volume is less than
417 // MinFreeKilobytes.
418 //
419 func (v *UnixVolume) IsFull() (isFull bool) {
420         fullSymlink := v.root + "/full"
421
422         // Check if the volume has been marked as full in the last hour.
423         if link, err := os.Readlink(fullSymlink); err == nil {
424                 if ts, err := strconv.Atoi(link); err == nil {
425                         fulltime := time.Unix(int64(ts), 0)
426                         if time.Since(fulltime).Hours() < 1.0 {
427                                 return true
428                         }
429                 }
430         }
431
432         if avail, err := v.FreeDiskSpace(); err == nil {
433                 isFull = avail < MinFreeKilobytes
434         } else {
435                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
436                 isFull = false
437         }
438
439         // If the volume is full, timestamp it.
440         if isFull {
441                 now := fmt.Sprintf("%d", time.Now().Unix())
442                 os.Symlink(now, fullSymlink)
443         }
444         return
445 }
446
447 // FreeDiskSpace returns the number of unused 1k blocks available on
448 // the volume.
449 //
450 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
451         var fs syscall.Statfs_t
452         err = syscall.Statfs(v.root, &fs)
453         if err == nil {
454                 // Statfs output is not guaranteed to measure free
455                 // space in terms of 1K blocks.
456                 free = fs.Bavail * uint64(fs.Bsize) / 1024
457         }
458         return
459 }
460
461 func (v *UnixVolume) String() string {
462         return fmt.Sprintf("[UnixVolume %s]", v.root)
463 }
464
465 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
466 func (v *UnixVolume) Writable() bool {
467         return !v.readonly
468 }
469
470 // lockfile and unlockfile use flock(2) to manage kernel file locks.
471 func lockfile(f *os.File) error {
472         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
473 }
474
475 func unlockfile(f *os.File) error {
476         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
477 }