21 type unixVolumeAdder struct {
25 func (vs *unixVolumeAdder) Set(value string) error {
26 if dirs := strings.Split(value, ","); len(dirs) > 1 {
27 log.Print("DEPRECATED: using comma-separated volume list.")
28 for _, dir := range dirs {
29 if err := vs.Set(dir); err != nil {
35 if len(value) == 0 || value[0] != '/' {
36 return errors.New("Invalid volume: must begin with '/'.")
38 if _, err := os.Stat(value); err != nil {
41 var locker sync.Locker
43 locker = &sync.Mutex{}
45 *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
48 readonly: flagReadonly,
55 &unixVolumeAdder{&volumes},
57 "Deprecated synonym for -volume.")
59 &unixVolumeAdder{&volumes},
61 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
69 f, err := os.Open(ProcMounts)
71 log.Fatalf("opening %s: %s", ProcMounts, err)
73 scanner := bufio.NewScanner(f)
75 args := strings.Fields(scanner.Text())
76 if err := scanner.Err(); err != nil {
77 log.Fatalf("reading %s: %s", ProcMounts, err)
79 dev, mount := args[0], args[1]
83 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
86 keepdir := mount + "/keep"
87 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
90 // Set the -readonly flag (but only for this volume)
91 // if the filesystem is mounted readonly.
92 flagReadonlyWas := flagReadonly
93 for _, fsopt := range strings.Split(args[3], ",") {
102 if err := vs.Set(keepdir); err != nil {
103 log.Printf("adding %q: %s", keepdir, err)
107 flagReadonly = flagReadonlyWas
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114 // path to the volume's root directory
116 // something to lock during IO, typically a sync.Mutex (or nil
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
125 return MethodDisabledError
127 p := v.blockPath(loc)
128 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
135 defer v.locker.Unlock()
137 if e := lockfile(f); e != nil {
141 now := time.Now().Unix()
142 utime := syscall.Utimbuf{now, now}
143 return syscall.Utime(p, &utime)
146 // Mtime returns the stored timestamp for the given locator.
147 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
148 p := v.blockPath(loc)
149 fi, err := os.Stat(p)
151 return time.Time{}, err
153 return fi.ModTime(), nil
156 // Lock the locker (if one is in use), open the file for reading, and
157 // call the given function if and when the file is ready to read.
158 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
161 defer v.locker.Unlock()
163 f, err := os.Open(path)
171 // stat is os.Stat() with some extra sanity checks.
172 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
173 stat, err := os.Stat(path)
177 } else if stat.Size() > BlockSize {
184 // Get retrieves a block identified by the locator string "loc", and
185 // returns its contents as a byte slice.
187 // Get returns a nil buffer IFF it returns a non-nil error.
188 func (v *UnixVolume) Get(loc string) ([]byte, error) {
189 path := v.blockPath(loc)
190 stat, err := v.stat(path)
192 return nil, v.translateError(err)
194 buf := bufs.Get(int(stat.Size()))
195 err = v.getFunc(path, func(rdr io.Reader) error {
196 _, err = io.ReadFull(rdr, buf)
206 // Compare returns nil if Get(loc) would return the same content as
207 // expect. It is functionally equivalent to Get() followed by
208 // bytes.Compare(), but uses less memory.
209 func (v *UnixVolume) Compare(loc string, expect []byte) error {
210 path := v.blockPath(loc)
211 if _, err := v.stat(path); err != nil {
212 return v.translateError(err)
214 return v.getFunc(path, func(rdr io.Reader) error {
215 return compareReaderWithBuf(rdr, expect, loc[:32])
219 // Put stores a block of data identified by the locator string
220 // "loc". It returns nil on success. If the volume is full, it
221 // returns a FullError. If the write fails due to some other error,
222 // that error is returned.
223 func (v *UnixVolume) Put(loc string, block []byte) error {
225 return MethodDisabledError
230 bdir := v.blockDir(loc)
231 if err := os.MkdirAll(bdir, 0755); err != nil {
232 log.Printf("%s: could not create directory %s: %s",
237 tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
239 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
242 bpath := v.blockPath(loc)
246 defer v.locker.Unlock()
248 if _, err := tmpfile.Write(block); err != nil {
249 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
251 os.Remove(tmpfile.Name())
254 if err := tmpfile.Close(); err != nil {
255 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
256 os.Remove(tmpfile.Name())
259 if err := os.Rename(tmpfile.Name(), bpath); err != nil {
260 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
261 os.Remove(tmpfile.Name())
267 // Status returns a VolumeStatus struct describing the volume's
268 // current state, or nil if an error occurs.
270 func (v *UnixVolume) Status() *VolumeStatus {
271 var fs syscall.Statfs_t
274 if fi, err := os.Stat(v.root); err == nil {
275 devnum = fi.Sys().(*syscall.Stat_t).Dev
277 log.Printf("%s: os.Stat: %s\n", v, err)
281 err := syscall.Statfs(v.root, &fs)
283 log.Printf("%s: statfs: %s\n", v, err)
286 // These calculations match the way df calculates disk usage:
287 // "free" space is measured by fs.Bavail, but "used" space
288 // uses fs.Blocks - fs.Bfree.
289 free := fs.Bavail * uint64(fs.Bsize)
290 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
291 return &VolumeStatus{v.root, devnum, free, used}
294 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
295 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
297 // IndexTo writes (to the given Writer) a list of blocks found on this
298 // volume which begin with the specified prefix. If the prefix is an
299 // empty string, IndexTo writes a complete list of blocks.
301 // Each block is given in the format
303 // locator+size modification-time {newline}
307 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
308 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
309 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
311 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
312 var lastErr error = nil
313 rootdir, err := os.Open(v.root)
317 defer rootdir.Close()
319 names, err := rootdir.Readdirnames(1)
322 } else if err != nil {
325 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
326 // prefix excludes all blocks stored in this dir
329 if !blockDirRe.MatchString(names[0]) {
332 blockdirpath := filepath.Join(v.root, names[0])
333 blockdir, err := os.Open(blockdirpath)
335 log.Print("Error reading ", blockdirpath, ": ", err)
340 fileInfo, err := blockdir.Readdir(1)
343 } else if err != nil {
344 log.Print("Error reading ", blockdirpath, ": ", err)
348 name := fileInfo[0].Name()
349 if !strings.HasPrefix(name, prefix) {
352 if !blockFileRe.MatchString(name) {
355 _, err = fmt.Fprint(w,
357 "+", fileInfo[0].Size(),
358 " ", fileInfo[0].ModTime().Unix(),
365 // Trash trashes the block data from the unix storage
366 // If trashLifetime == 0, the block is deleted
367 // Else, the block is renamed as path/{loc}.trash.{deadline},
368 // where deadline = now + trashLifetime
369 func (v *UnixVolume) Trash(loc string) error {
370 // Touch() must be called before calling Write() on a block. Touch()
371 // also uses lockfile(). This avoids a race condition between Write()
372 // and Trash() because either (a) the file will be trashed and Touch()
373 // will signal to the caller that the file is not present (and needs to
374 // be re-written), or (b) Touch() will update the file's timestamp and
375 // Trash() will read the correct up-to-date timestamp and choose not to
379 return MethodDisabledError
383 defer v.locker.Unlock()
385 p := v.blockPath(loc)
386 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
391 if e := lockfile(f); e != nil {
396 // If the block has been PUT in the last blobSignatureTTL
397 // seconds, return success without removing the block. This
398 // protects data from garbage collection until it is no longer
399 // possible for clients to retrieve the unreferenced blocks
400 // anyway (because the permission signatures have expired).
401 if fi, err := os.Stat(p); err != nil {
404 if time.Since(fi.ModTime()) < blobSignatureTTL {
409 if trashLifetime == 0 {
412 return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
415 // Untrash moves block from trash back into store
416 // Look for path/{loc}.trash.{deadline} in storage,
417 // and rename the first such file as path/{loc}
418 func (v *UnixVolume) Untrash(loc string) (err error) {
420 return MethodDisabledError
423 files, err := ioutil.ReadDir(v.blockDir(loc))
429 return os.ErrNotExist
433 prefix := fmt.Sprintf("%v.trash.", loc)
434 for _, f := range files {
435 if strings.HasPrefix(f.Name(), prefix) {
437 err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
444 if foundTrash == false {
445 return os.ErrNotExist
451 // blockDir returns the fully qualified directory name for the directory
452 // where loc is (or would be) stored on this volume.
453 func (v *UnixVolume) blockDir(loc string) string {
454 return filepath.Join(v.root, loc[0:3])
457 // blockPath returns the fully qualified pathname for the path to loc
459 func (v *UnixVolume) blockPath(loc string) string {
460 return filepath.Join(v.blockDir(loc), loc)
463 // IsFull returns true if the free space on the volume is less than
466 func (v *UnixVolume) IsFull() (isFull bool) {
467 fullSymlink := v.root + "/full"
469 // Check if the volume has been marked as full in the last hour.
470 if link, err := os.Readlink(fullSymlink); err == nil {
471 if ts, err := strconv.Atoi(link); err == nil {
472 fulltime := time.Unix(int64(ts), 0)
473 if time.Since(fulltime).Hours() < 1.0 {
479 if avail, err := v.FreeDiskSpace(); err == nil {
480 isFull = avail < MinFreeKilobytes
482 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
486 // If the volume is full, timestamp it.
488 now := fmt.Sprintf("%d", time.Now().Unix())
489 os.Symlink(now, fullSymlink)
494 // FreeDiskSpace returns the number of unused 1k blocks available on
497 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
498 var fs syscall.Statfs_t
499 err = syscall.Statfs(v.root, &fs)
501 // Statfs output is not guaranteed to measure free
502 // space in terms of 1K blocks.
503 free = fs.Bavail * uint64(fs.Bsize) / 1024
508 func (v *UnixVolume) String() string {
509 return fmt.Sprintf("[UnixVolume %s]", v.root)
512 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
513 func (v *UnixVolume) Writable() bool {
517 func (v *UnixVolume) Replication() int {
521 // lockfile and unlockfile use flock(2) to manage kernel file locks.
522 func lockfile(f *os.File) error {
523 return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
526 func unlockfile(f *os.File) error {
527 return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
530 // Where appropriate, translate a more specific filesystem error to an
531 // error recognized by handlers, like os.ErrNotExist.
532 func (v *UnixVolume) translateError(err error) error {
535 // stat() returns a PathError if the parent directory
536 // (not just the file itself) is missing
537 return os.ErrNotExist
543 var trashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
545 // EmptyTrash walks hierarchy looking for {hash}.trash.*
546 // and deletes those with deadline < now.
547 func (v *UnixVolume) EmptyTrash() {
548 var bytesDeleted, bytesInTrash int64
549 var blocksDeleted, blocksInTrash int
551 err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
553 log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
556 if info.Mode().IsDir() {
559 matches := trashLocRegexp.FindStringSubmatch(path)
560 if len(matches) != 3 {
563 deadline, err := strconv.ParseInt(matches[2], 10, 64)
565 log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
568 bytesInTrash += info.Size()
570 if deadline > time.Now().Unix() {
573 err = os.Remove(path)
575 log.Printf("EmptyTrash: Remove %v: %v", path, err)
578 bytesDeleted += info.Size()
584 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
587 log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)