21 type unixVolumeAdder struct {
25 func (vs *unixVolumeAdder) Set(value string) error {
26 if trashLifetime != 0 {
27 return ErrNotImplemented
29 if dirs := strings.Split(value, ","); len(dirs) > 1 {
30 log.Print("DEPRECATED: using comma-separated volume list.")
31 for _, dir := range dirs {
32 if err := vs.Set(dir); err != nil {
38 if len(value) == 0 || value[0] != '/' {
39 return errors.New("Invalid volume: must begin with '/'.")
41 if _, err := os.Stat(value); err != nil {
44 var locker sync.Locker
46 locker = &sync.Mutex{}
48 *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
51 readonly: flagReadonly,
58 &unixVolumeAdder{&volumes},
60 "Deprecated synonym for -volume.")
62 &unixVolumeAdder{&volumes},
64 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
67 // Discover adds a UnixVolume for every directory named "keep" that is
68 // located at the top level of a device- or tmpfs-backed mount point
69 // other than "/". It returns the number of volumes added.
70 func (vs *unixVolumeAdder) Discover() int {
72 f, err := os.Open(ProcMounts)
74 log.Fatalf("opening %s: %s", ProcMounts, err)
76 scanner := bufio.NewScanner(f)
78 args := strings.Fields(scanner.Text())
79 if err := scanner.Err(); err != nil {
80 log.Fatalf("reading %s: %s", ProcMounts, err)
82 dev, mount := args[0], args[1]
86 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
89 keepdir := mount + "/keep"
90 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
93 // Set the -readonly flag (but only for this volume)
94 // if the filesystem is mounted readonly.
95 flagReadonlyWas := flagReadonly
96 for _, fsopt := range strings.Split(args[3], ",") {
105 if err := vs.Set(keepdir); err != nil {
106 log.Printf("adding %q: %s", keepdir, err)
110 flagReadonly = flagReadonlyWas
115 // A UnixVolume stores and retrieves blocks in a local directory.
116 type UnixVolume struct {
117 // path to the volume's root directory
119 // something to lock during IO, typically a sync.Mutex (or nil
125 // Touch sets the timestamp for the given locator to the current time
126 func (v *UnixVolume) Touch(loc string) error {
128 return MethodDisabledError
130 p := v.blockPath(loc)
131 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
138 defer v.locker.Unlock()
140 if e := lockfile(f); e != nil {
144 now := time.Now().Unix()
145 utime := syscall.Utimbuf{now, now}
146 return syscall.Utime(p, &utime)
149 // Mtime returns the stored timestamp for the given locator.
150 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
151 p := v.blockPath(loc)
152 fi, err := os.Stat(p)
154 return time.Time{}, err
156 return fi.ModTime(), nil
159 // Lock the locker (if one is in use), open the file for reading, and
160 // call the given function if and when the file is ready to read.
161 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
164 defer v.locker.Unlock()
166 f, err := os.Open(path)
174 // stat is os.Stat() with some extra sanity checks.
175 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
176 stat, err := os.Stat(path)
180 } else if stat.Size() > BlockSize {
187 // Get retrieves a block identified by the locator string "loc", and
188 // returns its contents as a byte slice.
190 // Get returns a nil buffer IFF it returns a non-nil error.
191 func (v *UnixVolume) Get(loc string) ([]byte, error) {
192 path := v.blockPath(loc)
193 stat, err := v.stat(path)
195 return nil, v.translateError(err)
197 buf := bufs.Get(int(stat.Size()))
198 err = v.getFunc(path, func(rdr io.Reader) error {
199 _, err = io.ReadFull(rdr, buf)
209 // Compare returns nil if Get(loc) would return the same content as
210 // expect. It is functionally equivalent to Get() followed by
211 // bytes.Compare(), but uses less memory.
212 func (v *UnixVolume) Compare(loc string, expect []byte) error {
213 path := v.blockPath(loc)
214 if _, err := v.stat(path); err != nil {
215 return v.translateError(err)
217 return v.getFunc(path, func(rdr io.Reader) error {
218 return compareReaderWithBuf(rdr, expect, loc[:32])
222 // Put stores a block of data identified by the locator string
223 // "loc". It returns nil on success. If the volume is full, it
224 // returns a FullError. If the write fails due to some other error,
225 // that error is returned.
226 func (v *UnixVolume) Put(loc string, block []byte) error {
228 return MethodDisabledError
233 bdir := v.blockDir(loc)
234 if err := os.MkdirAll(bdir, 0755); err != nil {
235 log.Printf("%s: could not create directory %s: %s",
240 tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
242 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
245 bpath := v.blockPath(loc)
249 defer v.locker.Unlock()
251 if _, err := tmpfile.Write(block); err != nil {
252 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
254 os.Remove(tmpfile.Name())
257 if err := tmpfile.Close(); err != nil {
258 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
259 os.Remove(tmpfile.Name())
262 if err := os.Rename(tmpfile.Name(), bpath); err != nil {
263 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
264 os.Remove(tmpfile.Name())
270 // Status returns a VolumeStatus struct describing the volume's
271 // current state, or nil if an error occurs.
273 func (v *UnixVolume) Status() *VolumeStatus {
274 var fs syscall.Statfs_t
277 if fi, err := os.Stat(v.root); err == nil {
278 devnum = fi.Sys().(*syscall.Stat_t).Dev
280 log.Printf("%s: os.Stat: %s\n", v, err)
284 err := syscall.Statfs(v.root, &fs)
286 log.Printf("%s: statfs: %s\n", v, err)
289 // These calculations match the way df calculates disk usage:
290 // "free" space is measured by fs.Bavail, but "used" space
291 // uses fs.Blocks - fs.Bfree.
292 free := fs.Bavail * uint64(fs.Bsize)
293 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
294 return &VolumeStatus{v.root, devnum, free, used}
297 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
298 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
300 // IndexTo writes (to the given Writer) a list of blocks found on this
301 // volume which begin with the specified prefix. If the prefix is an
302 // empty string, IndexTo writes a complete list of blocks.
304 // Each block is given in the format
306 // locator+size modification-time {newline}
310 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
311 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
312 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
314 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
315 var lastErr error = nil
316 rootdir, err := os.Open(v.root)
320 defer rootdir.Close()
322 names, err := rootdir.Readdirnames(1)
325 } else if err != nil {
328 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
329 // prefix excludes all blocks stored in this dir
332 if !blockDirRe.MatchString(names[0]) {
335 blockdirpath := filepath.Join(v.root, names[0])
336 blockdir, err := os.Open(blockdirpath)
338 log.Print("Error reading ", blockdirpath, ": ", err)
343 fileInfo, err := blockdir.Readdir(1)
346 } else if err != nil {
347 log.Print("Error reading ", blockdirpath, ": ", err)
351 name := fileInfo[0].Name()
352 if !strings.HasPrefix(name, prefix) {
355 if !blockFileRe.MatchString(name) {
358 _, err = fmt.Fprint(w,
360 "+", fileInfo[0].Size(),
361 " ", fileInfo[0].ModTime().Unix(),
368 // Delete deletes the block data from the unix storage
369 func (v *UnixVolume) Trash(loc string) error {
370 // Touch() must be called before calling Write() on a block. Touch()
371 // also uses lockfile(). This avoids a race condition between Write()
372 // and Delete() because either (a) the file will be deleted and Touch()
373 // will signal to the caller that the file is not present (and needs to
374 // be re-written), or (b) Touch() will update the file's timestamp and
375 // Delete() will read the correct up-to-date timestamp and choose not to
379 return MethodDisabledError
381 if trashLifetime != 0 {
382 return ErrNotImplemented
386 defer v.locker.Unlock()
388 p := v.blockPath(loc)
389 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
394 if e := lockfile(f); e != nil {
399 // If the block has been PUT in the last blobSignatureTTL
400 // seconds, return success without removing the block. This
401 // protects data from garbage collection until it is no longer
402 // possible for clients to retrieve the unreferenced blocks
403 // anyway (because the permission signatures have expired).
404 if fi, err := os.Stat(p); err != nil {
407 if time.Since(fi.ModTime()) < blobSignatureTTL {
414 // Untrash moves block from trash back into store
416 func (v *UnixVolume) Untrash(loc string) error {
417 return ErrNotImplemented
420 // blockDir returns the fully qualified directory name for the directory
421 // where loc is (or would be) stored on this volume.
422 func (v *UnixVolume) blockDir(loc string) string {
423 return filepath.Join(v.root, loc[0:3])
426 // blockPath returns the fully qualified pathname for the path to loc
428 func (v *UnixVolume) blockPath(loc string) string {
429 return filepath.Join(v.blockDir(loc), loc)
432 // IsFull returns true if the free space on the volume is less than
435 func (v *UnixVolume) IsFull() (isFull bool) {
436 fullSymlink := v.root + "/full"
438 // Check if the volume has been marked as full in the last hour.
439 if link, err := os.Readlink(fullSymlink); err == nil {
440 if ts, err := strconv.Atoi(link); err == nil {
441 fulltime := time.Unix(int64(ts), 0)
442 if time.Since(fulltime).Hours() < 1.0 {
448 if avail, err := v.FreeDiskSpace(); err == nil {
449 isFull = avail < MinFreeKilobytes
451 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
455 // If the volume is full, timestamp it.
457 now := fmt.Sprintf("%d", time.Now().Unix())
458 os.Symlink(now, fullSymlink)
463 // FreeDiskSpace returns the number of unused 1k blocks available on
466 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
467 var fs syscall.Statfs_t
468 err = syscall.Statfs(v.root, &fs)
470 // Statfs output is not guaranteed to measure free
471 // space in terms of 1K blocks.
472 free = fs.Bavail * uint64(fs.Bsize) / 1024
477 func (v *UnixVolume) String() string {
478 return fmt.Sprintf("[UnixVolume %s]", v.root)
481 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
482 func (v *UnixVolume) Writable() bool {
486 func (v *UnixVolume) Replication() int {
490 // lockfile and unlockfile use flock(2) to manage kernel file locks.
491 func lockfile(f *os.File) error {
492 return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
495 func unlockfile(f *os.File) error {
496 return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
499 // Where appropriate, translate a more specific filesystem error to an
500 // error recognized by handlers, like os.ErrNotExist.
501 func (v *UnixVolume) translateError(err error) error {
504 // stat() returns a PathError if the parent directory
505 // (not just the file itself) is missing
506 return os.ErrNotExist