7121: Return DiskHashError instead of CollisionError from Compare() where appropriate.
[arvados.git] / services / keepstore / volume_unix.go
1 package main
2
3 import (
4         "bytes"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "log"
9         "os"
10         "path/filepath"
11         "regexp"
12         "strconv"
13         "strings"
14         "sync"
15         "syscall"
16         "time"
17 )
18
19 // A UnixVolume stores and retrieves blocks in a local directory.
20 type UnixVolume struct {
21         root      string // path to the volume's root directory
22         serialize bool
23         readonly  bool
24         mutex     sync.Mutex
25 }
26
27 func (v *UnixVolume) Touch(loc string) error {
28         if v.readonly {
29                 return MethodDisabledError
30         }
31         p := v.blockPath(loc)
32         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
33         if err != nil {
34                 return err
35         }
36         defer f.Close()
37         if v.serialize {
38                 v.mutex.Lock()
39                 defer v.mutex.Unlock()
40         }
41         if e := lockfile(f); e != nil {
42                 return e
43         }
44         defer unlockfile(f)
45         now := time.Now().Unix()
46         utime := syscall.Utimbuf{now, now}
47         return syscall.Utime(p, &utime)
48 }
49
50 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
51         p := v.blockPath(loc)
52         if fi, err := os.Stat(p); err != nil {
53                 return time.Time{}, err
54         } else {
55                 return fi.ModTime(), nil
56         }
57 }
58
59 // Open the given file, apply the serialize lock if enabled, and call
60 // the given function if and when the file is ready to read.
61 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
62         f, err := os.Open(path)
63         if err != nil {
64                 return err
65         }
66         defer f.Close()
67         if v.serialize {
68                 v.mutex.Lock()
69                 defer v.mutex.Unlock()
70         }
71         return fn(f)
72 }
73
74 // stat is os.Stat() with some extra sanity checks.
75 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
76         stat, err := os.Stat(path)
77         if err == nil {
78                 if stat.Size() < 0 {
79                         err = os.ErrInvalid
80                 } else if stat.Size() > BLOCKSIZE {
81                         err = TooLongError
82                 }
83         }
84         return stat, err
85 }
86
87 // Get retrieves a block identified by the locator string "loc", and
88 // returns its contents as a byte slice.
89 //
90 // Get returns a nil buffer IFF it returns a non-nil error.
91 func (v *UnixVolume) Get(loc string) ([]byte, error) {
92         path := v.blockPath(loc)
93         stat, err := v.stat(path)
94         if err != nil {
95                 return nil, err
96         }
97         buf := bufs.Get(int(stat.Size()))
98         err = v.getFunc(path, func(rdr io.Reader) error {
99                 _, err = io.ReadFull(rdr, buf)
100                 return err
101         })
102         if err != nil {
103                 bufs.Put(buf)
104                 return nil, err
105         }
106         return buf, nil
107 }
108
109 // Compare returns nil if Get(loc) would return the same content as
110 // cmp. It is functionally equivalent to Get() followed by
111 // bytes.Compare(), but uses less memory.
112 func (v *UnixVolume) Compare(loc string, expect []byte) error {
113         path := v.blockPath(loc)
114         stat, err := v.stat(path)
115         if err != nil {
116                 return err
117         }
118         bufLen := 1 << 20
119         if int64(bufLen) > stat.Size() {
120                 bufLen = int(stat.Size())
121         }
122         cmp := expect
123         buf := make([]byte, bufLen)
124         return v.getFunc(path, func(rdr io.Reader) error {
125                 // Loop invariants: all data read so far matched what
126                 // we expected, and the first N bytes of cmp are
127                 // expected to equal the next N bytes read from
128                 // reader.
129                 for {
130                         n, err := rdr.Read(buf)
131                         if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
132                                 return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
133                         }
134                         cmp = cmp[n:]
135                         if err == io.EOF {
136                                 if len(cmp) != 0 {
137                                         return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
138                                 }
139                                 return nil
140                         } else if err != nil {
141                                 return err
142                         }
143                 }
144         })
145 }
146
147 // Put stores a block of data identified by the locator string
148 // "loc".  It returns nil on success.  If the volume is full, it
149 // returns a FullError.  If the write fails due to some other error,
150 // that error is returned.
151 func (v *UnixVolume) Put(loc string, block []byte) error {
152         if v.readonly {
153                 return MethodDisabledError
154         }
155         if v.IsFull() {
156                 return FullError
157         }
158         bdir := v.blockDir(loc)
159         if err := os.MkdirAll(bdir, 0755); err != nil {
160                 log.Printf("%s: could not create directory %s: %s",
161                         loc, bdir, err)
162                 return err
163         }
164
165         tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
166         if tmperr != nil {
167                 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
168                 return tmperr
169         }
170         bpath := v.blockPath(loc)
171
172         if v.serialize {
173                 v.mutex.Lock()
174                 defer v.mutex.Unlock()
175         }
176         if _, err := tmpfile.Write(block); err != nil {
177                 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
178                 tmpfile.Close()
179                 os.Remove(tmpfile.Name())
180                 return err
181         }
182         if err := tmpfile.Close(); err != nil {
183                 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
184                 os.Remove(tmpfile.Name())
185                 return err
186         }
187         if err := os.Rename(tmpfile.Name(), bpath); err != nil {
188                 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
189                 os.Remove(tmpfile.Name())
190                 return err
191         }
192         return nil
193 }
194
195 // Status returns a VolumeStatus struct describing the volume's
196 // current state, or nil if an error occurs.
197 //
198 func (v *UnixVolume) Status() *VolumeStatus {
199         var fs syscall.Statfs_t
200         var devnum uint64
201
202         if fi, err := os.Stat(v.root); err == nil {
203                 devnum = fi.Sys().(*syscall.Stat_t).Dev
204         } else {
205                 log.Printf("%s: os.Stat: %s\n", v, err)
206                 return nil
207         }
208
209         err := syscall.Statfs(v.root, &fs)
210         if err != nil {
211                 log.Printf("%s: statfs: %s\n", v, err)
212                 return nil
213         }
214         // These calculations match the way df calculates disk usage:
215         // "free" space is measured by fs.Bavail, but "used" space
216         // uses fs.Blocks - fs.Bfree.
217         free := fs.Bavail * uint64(fs.Bsize)
218         used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
219         return &VolumeStatus{v.root, devnum, free, used}
220 }
221
222 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
223
224 // IndexTo writes (to the given Writer) a list of blocks found on this
225 // volume which begin with the specified prefix. If the prefix is an
226 // empty string, IndexTo writes a complete list of blocks.
227 //
228 // Each block is given in the format
229 //
230 //     locator+size modification-time {newline}
231 //
232 // e.g.:
233 //
234 //     e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
235 //     e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
236 //     e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
237 //
238 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
239         var lastErr error = nil
240         rootdir, err := os.Open(v.root)
241         if err != nil {
242                 return err
243         }
244         defer rootdir.Close()
245         for {
246                 names, err := rootdir.Readdirnames(1)
247                 if err == io.EOF {
248                         return lastErr
249                 } else if err != nil {
250                         return err
251                 }
252                 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
253                         // prefix excludes all blocks stored in this dir
254                         continue
255                 }
256                 if !blockDirRe.MatchString(names[0]) {
257                         continue
258                 }
259                 blockdirpath := filepath.Join(v.root, names[0])
260                 blockdir, err := os.Open(blockdirpath)
261                 if err != nil {
262                         log.Print("Error reading ", blockdirpath, ": ", err)
263                         lastErr = err
264                         continue
265                 }
266                 for {
267                         fileInfo, err := blockdir.Readdir(1)
268                         if err == io.EOF {
269                                 break
270                         } else if err != nil {
271                                 log.Print("Error reading ", blockdirpath, ": ", err)
272                                 lastErr = err
273                                 break
274                         }
275                         name := fileInfo[0].Name()
276                         if !strings.HasPrefix(name, prefix) {
277                                 continue
278                         }
279                         _, err = fmt.Fprint(w,
280                                 name,
281                                 "+", fileInfo[0].Size(),
282                                 " ", fileInfo[0].ModTime().Unix(),
283                                 "\n")
284                 }
285                 blockdir.Close()
286         }
287 }
288
289 func (v *UnixVolume) Delete(loc string) error {
290         // Touch() must be called before calling Write() on a block.  Touch()
291         // also uses lockfile().  This avoids a race condition between Write()
292         // and Delete() because either (a) the file will be deleted and Touch()
293         // will signal to the caller that the file is not present (and needs to
294         // be re-written), or (b) Touch() will update the file's timestamp and
295         // Delete() will read the correct up-to-date timestamp and choose not to
296         // delete the file.
297
298         if v.readonly {
299                 return MethodDisabledError
300         }
301         if v.serialize {
302                 v.mutex.Lock()
303                 defer v.mutex.Unlock()
304         }
305         p := v.blockPath(loc)
306         f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
307         if err != nil {
308                 return err
309         }
310         defer f.Close()
311         if e := lockfile(f); e != nil {
312                 return e
313         }
314         defer unlockfile(f)
315
316         // If the block has been PUT in the last blob_signature_ttl
317         // seconds, return success without removing the block. This
318         // protects data from garbage collection until it is no longer
319         // possible for clients to retrieve the unreferenced blocks
320         // anyway (because the permission signatures have expired).
321         if fi, err := os.Stat(p); err != nil {
322                 return err
323         } else {
324                 if time.Since(fi.ModTime()) < blob_signature_ttl {
325                         return nil
326                 }
327         }
328         return os.Remove(p)
329 }
330
331 // blockDir returns the fully qualified directory name for the directory
332 // where loc is (or would be) stored on this volume.
333 func (v *UnixVolume) blockDir(loc string) string {
334         return filepath.Join(v.root, loc[0:3])
335 }
336
337 // blockPath returns the fully qualified pathname for the path to loc
338 // on this volume.
339 func (v *UnixVolume) blockPath(loc string) string {
340         return filepath.Join(v.blockDir(loc), loc)
341 }
342
343 // IsFull returns true if the free space on the volume is less than
344 // MIN_FREE_KILOBYTES.
345 //
346 func (v *UnixVolume) IsFull() (isFull bool) {
347         fullSymlink := v.root + "/full"
348
349         // Check if the volume has been marked as full in the last hour.
350         if link, err := os.Readlink(fullSymlink); err == nil {
351                 if ts, err := strconv.Atoi(link); err == nil {
352                         fulltime := time.Unix(int64(ts), 0)
353                         if time.Since(fulltime).Hours() < 1.0 {
354                                 return true
355                         }
356                 }
357         }
358
359         if avail, err := v.FreeDiskSpace(); err == nil {
360                 isFull = avail < MIN_FREE_KILOBYTES
361         } else {
362                 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
363                 isFull = false
364         }
365
366         // If the volume is full, timestamp it.
367         if isFull {
368                 now := fmt.Sprintf("%d", time.Now().Unix())
369                 os.Symlink(now, fullSymlink)
370         }
371         return
372 }
373
374 // FreeDiskSpace returns the number of unused 1k blocks available on
375 // the volume.
376 //
377 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
378         var fs syscall.Statfs_t
379         err = syscall.Statfs(v.root, &fs)
380         if err == nil {
381                 // Statfs output is not guaranteed to measure free
382                 // space in terms of 1K blocks.
383                 free = fs.Bavail * uint64(fs.Bsize) / 1024
384         }
385         return
386 }
387
388 func (v *UnixVolume) String() string {
389         return fmt.Sprintf("[UnixVolume %s]", v.root)
390 }
391
392 func (v *UnixVolume) Writable() bool {
393         return !v.readonly
394 }
395
396 // lockfile and unlockfile use flock(2) to manage kernel file locks.
397 func lockfile(f *os.File) error {
398         return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
399 }
400
401 func unlockfile(f *os.File) error {
402         return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
403 }