22 type unixVolumeAdder struct {
26 func (vs *unixVolumeAdder) Set(value string) error {
27 if dirs := strings.Split(value, ","); len(dirs) > 1 {
28 log.Print("DEPRECATED: using comma-separated volume list.")
29 for _, dir := range dirs {
30 if err := vs.Set(dir); err != nil {
36 if len(value) == 0 || value[0] != '/' {
37 return errors.New("Invalid volume: must begin with '/'.")
39 if _, err := os.Stat(value); err != nil {
42 var locker sync.Locker
44 locker = &sync.Mutex{}
46 *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
49 readonly: flagReadonly,
56 &unixVolumeAdder{&volumes},
58 "Deprecated synonym for -volume.")
60 &unixVolumeAdder{&volumes},
62 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
65 // Discover adds a UnixVolume for every directory named "keep" that is
66 // located at the top level of a device- or tmpfs-backed mount point
67 // other than "/". It returns the number of volumes added.
68 func (vs *unixVolumeAdder) Discover() int {
70 f, err := os.Open(ProcMounts)
72 log.Fatalf("opening %s: %s", ProcMounts, err)
74 scanner := bufio.NewScanner(f)
76 args := strings.Fields(scanner.Text())
77 if err := scanner.Err(); err != nil {
78 log.Fatalf("reading %s: %s", ProcMounts, err)
80 dev, mount := args[0], args[1]
84 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
87 keepdir := mount + "/keep"
88 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
91 // Set the -readonly flag (but only for this volume)
92 // if the filesystem is mounted readonly.
93 flagReadonlyWas := flagReadonly
94 for _, fsopt := range strings.Split(args[3], ",") {
103 if err := vs.Set(keepdir); err != nil {
104 log.Printf("adding %q: %s", keepdir, err)
108 flagReadonly = flagReadonlyWas
113 // A UnixVolume stores and retrieves blocks in a local directory.
114 type UnixVolume struct {
115 // path to the volume's root directory
117 // something to lock during IO, typically a sync.Mutex (or nil
123 // Touch sets the timestamp for the given locator to the current time
124 func (v *UnixVolume) Touch(loc string) error {
126 return MethodDisabledError
128 p := v.blockPath(loc)
129 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
136 defer v.locker.Unlock()
138 if e := lockfile(f); e != nil {
142 now := time.Now().Unix()
143 utime := syscall.Utimbuf{now, now}
144 return syscall.Utime(p, &utime)
147 // Mtime returns the stored timestamp for the given locator.
148 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
149 p := v.blockPath(loc)
150 fi, err := os.Stat(p)
152 return time.Time{}, err
154 return fi.ModTime(), nil
157 // Lock the locker (if one is in use), open the file for reading, and
158 // call the given function if and when the file is ready to read.
159 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
162 defer v.locker.Unlock()
164 f, err := os.Open(path)
172 // stat is os.Stat() with some extra sanity checks.
173 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
174 stat, err := os.Stat(path)
178 } else if stat.Size() > BlockSize {
185 // Get retrieves a block identified by the locator string "loc", and
186 // returns its contents as a byte slice.
188 // Get returns a nil buffer IFF it returns a non-nil error.
189 func (v *UnixVolume) Get(loc string) ([]byte, error) {
190 path := v.blockPath(loc)
191 stat, err := v.stat(path)
195 buf := bufs.Get(int(stat.Size()))
196 err = v.getFunc(path, func(rdr io.Reader) error {
197 _, err = io.ReadFull(rdr, buf)
207 // Compare returns nil if Get(loc) would return the same content as
208 // expect. It is functionally equivalent to Get() followed by
209 // bytes.Compare(), but uses less memory.
210 func (v *UnixVolume) Compare(loc string, expect []byte) error {
211 path := v.blockPath(loc)
212 stat, err := v.stat(path)
217 if int64(bufLen) > stat.Size() {
218 bufLen = int(stat.Size())
220 // len(buf)==0 would prevent us from handling
221 // empty files the same way as non-empty
222 // files, because reading 0 bytes at a time
223 // never reaches EOF.
228 buf := make([]byte, bufLen)
229 return v.getFunc(path, func(rdr io.Reader) error {
230 // Loop invariants: all data read so far matched what
231 // we expected, and the first N bytes of cmp are
232 // expected to equal the next N bytes read from
235 n, err := rdr.Read(buf)
236 if n > len(cmp) || bytes.Compare(cmp[:n], buf[:n]) != 0 {
237 return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], buf[:n], rdr)
242 return collisionOrCorrupt(loc[:32], expect[:len(expect)-len(cmp)], nil, nil)
245 } else if err != nil {
252 // Put stores a block of data identified by the locator string
253 // "loc". It returns nil on success. If the volume is full, it
254 // returns a FullError. If the write fails due to some other error,
255 // that error is returned.
256 func (v *UnixVolume) Put(loc string, block []byte) error {
258 return MethodDisabledError
263 bdir := v.blockDir(loc)
264 if err := os.MkdirAll(bdir, 0755); err != nil {
265 log.Printf("%s: could not create directory %s: %s",
270 tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
272 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
275 bpath := v.blockPath(loc)
279 defer v.locker.Unlock()
281 if _, err := tmpfile.Write(block); err != nil {
282 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
284 os.Remove(tmpfile.Name())
287 if err := tmpfile.Close(); err != nil {
288 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
289 os.Remove(tmpfile.Name())
292 if err := os.Rename(tmpfile.Name(), bpath); err != nil {
293 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
294 os.Remove(tmpfile.Name())
300 // Status returns a VolumeStatus struct describing the volume's
301 // current state, or nil if an error occurs.
303 func (v *UnixVolume) Status() *VolumeStatus {
304 var fs syscall.Statfs_t
307 if fi, err := os.Stat(v.root); err == nil {
308 devnum = fi.Sys().(*syscall.Stat_t).Dev
310 log.Printf("%s: os.Stat: %s\n", v, err)
314 err := syscall.Statfs(v.root, &fs)
316 log.Printf("%s: statfs: %s\n", v, err)
319 // These calculations match the way df calculates disk usage:
320 // "free" space is measured by fs.Bavail, but "used" space
321 // uses fs.Blocks - fs.Bfree.
322 free := fs.Bavail * uint64(fs.Bsize)
323 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
324 return &VolumeStatus{v.root, devnum, free, used}
327 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
329 // IndexTo writes (to the given Writer) a list of blocks found on this
330 // volume which begin with the specified prefix. If the prefix is an
331 // empty string, IndexTo writes a complete list of blocks.
333 // Each block is given in the format
335 // locator+size modification-time {newline}
339 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
340 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
341 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
343 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
344 var lastErr error = nil
345 rootdir, err := os.Open(v.root)
349 defer rootdir.Close()
351 names, err := rootdir.Readdirnames(1)
354 } else if err != nil {
357 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
358 // prefix excludes all blocks stored in this dir
361 if !blockDirRe.MatchString(names[0]) {
364 blockdirpath := filepath.Join(v.root, names[0])
365 blockdir, err := os.Open(blockdirpath)
367 log.Print("Error reading ", blockdirpath, ": ", err)
372 fileInfo, err := blockdir.Readdir(1)
375 } else if err != nil {
376 log.Print("Error reading ", blockdirpath, ": ", err)
380 name := fileInfo[0].Name()
381 if !strings.HasPrefix(name, prefix) {
384 _, err = fmt.Fprint(w,
386 "+", fileInfo[0].Size(),
387 " ", fileInfo[0].ModTime().Unix(),
394 // Delete deletes the block data from the unix storage
395 func (v *UnixVolume) Delete(loc string) error {
396 // Touch() must be called before calling Write() on a block. Touch()
397 // also uses lockfile(). This avoids a race condition between Write()
398 // and Delete() because either (a) the file will be deleted and Touch()
399 // will signal to the caller that the file is not present (and needs to
400 // be re-written), or (b) Touch() will update the file's timestamp and
401 // Delete() will read the correct up-to-date timestamp and choose not to
405 return MethodDisabledError
409 defer v.locker.Unlock()
411 p := v.blockPath(loc)
412 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
417 if e := lockfile(f); e != nil {
422 // If the block has been PUT in the last blobSignatureTTL
423 // seconds, return success without removing the block. This
424 // protects data from garbage collection until it is no longer
425 // possible for clients to retrieve the unreferenced blocks
426 // anyway (because the permission signatures have expired).
427 if fi, err := os.Stat(p); err != nil {
430 if time.Since(fi.ModTime()) < blobSignatureTTL {
437 // blockDir returns the fully qualified directory name for the directory
438 // where loc is (or would be) stored on this volume.
439 func (v *UnixVolume) blockDir(loc string) string {
440 return filepath.Join(v.root, loc[0:3])
443 // blockPath returns the fully qualified pathname for the path to loc
445 func (v *UnixVolume) blockPath(loc string) string {
446 return filepath.Join(v.blockDir(loc), loc)
449 // IsFull returns true if the free space on the volume is less than
452 func (v *UnixVolume) IsFull() (isFull bool) {
453 fullSymlink := v.root + "/full"
455 // Check if the volume has been marked as full in the last hour.
456 if link, err := os.Readlink(fullSymlink); err == nil {
457 if ts, err := strconv.Atoi(link); err == nil {
458 fulltime := time.Unix(int64(ts), 0)
459 if time.Since(fulltime).Hours() < 1.0 {
465 if avail, err := v.FreeDiskSpace(); err == nil {
466 isFull = avail < MinFreeKilobytes
468 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
472 // If the volume is full, timestamp it.
474 now := fmt.Sprintf("%d", time.Now().Unix())
475 os.Symlink(now, fullSymlink)
480 // FreeDiskSpace returns the number of unused 1k blocks available on
483 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
484 var fs syscall.Statfs_t
485 err = syscall.Statfs(v.root, &fs)
487 // Statfs output is not guaranteed to measure free
488 // space in terms of 1K blocks.
489 free = fs.Bavail * uint64(fs.Bsize) / 1024
494 func (v *UnixVolume) String() string {
495 return fmt.Sprintf("[UnixVolume %s]", v.root)
498 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
499 func (v *UnixVolume) Writable() bool {
503 // lockfile and unlockfile use flock(2) to manage kernel file locks.
504 func lockfile(f *os.File) error {
505 return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
508 func unlockfile(f *os.File) error {
509 return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)