21 type unixVolumeAdder struct {
25 func (vs *unixVolumeAdder) Set(value string) error {
26 if dirs := strings.Split(value, ","); len(dirs) > 1 {
27 log.Print("DEPRECATED: using comma-separated volume list.")
28 for _, dir := range dirs {
29 if err := vs.Set(dir); err != nil {
35 if len(value) == 0 || value[0] != '/' {
36 return errors.New("Invalid volume: must begin with '/'.")
38 if _, err := os.Stat(value); err != nil {
41 var locker sync.Locker
43 locker = &sync.Mutex{}
45 *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
48 readonly: flagReadonly,
55 &unixVolumeAdder{&volumes},
57 "Deprecated synonym for -volume.")
59 &unixVolumeAdder{&volumes},
61 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
69 f, err := os.Open(ProcMounts)
71 log.Fatalf("opening %s: %s", ProcMounts, err)
73 scanner := bufio.NewScanner(f)
75 args := strings.Fields(scanner.Text())
76 if err := scanner.Err(); err != nil {
77 log.Fatalf("reading %s: %s", ProcMounts, err)
79 dev, mount := args[0], args[1]
83 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
86 keepdir := mount + "/keep"
87 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
90 // Set the -readonly flag (but only for this volume)
91 // if the filesystem is mounted readonly.
92 flagReadonlyWas := flagReadonly
93 for _, fsopt := range strings.Split(args[3], ",") {
102 if err := vs.Set(keepdir); err != nil {
103 log.Printf("adding %q: %s", keepdir, err)
107 flagReadonly = flagReadonlyWas
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114 // path to the volume's root directory
116 // something to lock during IO, typically a sync.Mutex (or nil
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
125 return MethodDisabledError
127 p := v.blockPath(loc)
128 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
135 defer v.locker.Unlock()
137 if e := lockfile(f); e != nil {
141 now := time.Now().Unix()
142 utime := syscall.Utimbuf{now, now}
143 return syscall.Utime(p, &utime)
146 // Mtime returns the stored timestamp for the given locator.
147 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
148 p := v.blockPath(loc)
149 fi, err := os.Stat(p)
151 return time.Time{}, err
153 return fi.ModTime(), nil
156 // Lock the locker (if one is in use), open the file for reading, and
157 // call the given function if and when the file is ready to read.
158 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
161 defer v.locker.Unlock()
163 f, err := os.Open(path)
171 // stat is os.Stat() with some extra sanity checks.
172 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
173 stat, err := os.Stat(path)
177 } else if stat.Size() > BlockSize {
184 // Get retrieves a block, copies it to the given slice, and returns
185 // the number of bytes copied.
186 func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
187 path := v.blockPath(loc)
188 stat, err := v.stat(path)
190 return 0, v.translateError(err)
192 if stat.Size() > int64(len(buf)) {
193 return 0, TooLongError
196 size := int(stat.Size())
197 err = v.getFunc(path, func(rdr io.Reader) error {
198 read, err = io.ReadFull(rdr, buf[:size])
204 // Compare returns nil if Get(loc) would return the same content as
205 // expect. It is functionally equivalent to Get() followed by
206 // bytes.Compare(), but uses less memory.
207 func (v *UnixVolume) Compare(loc string, expect []byte) error {
208 path := v.blockPath(loc)
209 if _, err := v.stat(path); err != nil {
210 return v.translateError(err)
212 return v.getFunc(path, func(rdr io.Reader) error {
213 return compareReaderWithBuf(rdr, expect, loc[:32])
217 // Put stores a block of data identified by the locator string
218 // "loc". It returns nil on success. If the volume is full, it
219 // returns a FullError. If the write fails due to some other error,
220 // that error is returned.
221 func (v *UnixVolume) Put(loc string, block []byte) error {
223 return MethodDisabledError
228 bdir := v.blockDir(loc)
229 if err := os.MkdirAll(bdir, 0755); err != nil {
230 log.Printf("%s: could not create directory %s: %s",
235 tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
237 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
240 bpath := v.blockPath(loc)
244 defer v.locker.Unlock()
246 if _, err := tmpfile.Write(block); err != nil {
247 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
249 os.Remove(tmpfile.Name())
252 if err := tmpfile.Close(); err != nil {
253 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
254 os.Remove(tmpfile.Name())
257 if err := os.Rename(tmpfile.Name(), bpath); err != nil {
258 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
259 os.Remove(tmpfile.Name())
265 // Status returns a VolumeStatus struct describing the volume's
266 // current state, or nil if an error occurs.
268 func (v *UnixVolume) Status() *VolumeStatus {
269 var fs syscall.Statfs_t
272 if fi, err := os.Stat(v.root); err == nil {
273 devnum = fi.Sys().(*syscall.Stat_t).Dev
275 log.Printf("%s: os.Stat: %s\n", v, err)
279 err := syscall.Statfs(v.root, &fs)
281 log.Printf("%s: statfs: %s\n", v, err)
284 // These calculations match the way df calculates disk usage:
285 // "free" space is measured by fs.Bavail, but "used" space
286 // uses fs.Blocks - fs.Bfree.
287 free := fs.Bavail * uint64(fs.Bsize)
288 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
289 return &VolumeStatus{v.root, devnum, free, used}
292 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
293 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
295 // IndexTo writes (to the given Writer) a list of blocks found on this
296 // volume which begin with the specified prefix. If the prefix is an
297 // empty string, IndexTo writes a complete list of blocks.
299 // Each block is given in the format
301 // locator+size modification-time {newline}
305 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
306 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
307 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
309 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
310 var lastErr error = nil
311 rootdir, err := os.Open(v.root)
315 defer rootdir.Close()
317 names, err := rootdir.Readdirnames(1)
320 } else if err != nil {
323 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
324 // prefix excludes all blocks stored in this dir
327 if !blockDirRe.MatchString(names[0]) {
330 blockdirpath := filepath.Join(v.root, names[0])
331 blockdir, err := os.Open(blockdirpath)
333 log.Print("Error reading ", blockdirpath, ": ", err)
338 fileInfo, err := blockdir.Readdir(1)
341 } else if err != nil {
342 log.Print("Error reading ", blockdirpath, ": ", err)
346 name := fileInfo[0].Name()
347 if !strings.HasPrefix(name, prefix) {
350 if !blockFileRe.MatchString(name) {
353 _, err = fmt.Fprint(w,
355 "+", fileInfo[0].Size(),
356 " ", fileInfo[0].ModTime().Unix(),
363 // Trash trashes the block data from the unix storage
364 // If trashLifetime == 0, the block is deleted
365 // Else, the block is renamed as path/{loc}.trash.{deadline},
366 // where deadline = now + trashLifetime
367 func (v *UnixVolume) Trash(loc string) error {
368 // Touch() must be called before calling Write() on a block. Touch()
369 // also uses lockfile(). This avoids a race condition between Write()
370 // and Trash() because either (a) the file will be trashed and Touch()
371 // will signal to the caller that the file is not present (and needs to
372 // be re-written), or (b) Touch() will update the file's timestamp and
373 // Trash() will read the correct up-to-date timestamp and choose not to
377 return MethodDisabledError
381 defer v.locker.Unlock()
383 p := v.blockPath(loc)
384 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
389 if e := lockfile(f); e != nil {
394 // If the block has been PUT in the last blobSignatureTTL
395 // seconds, return success without removing the block. This
396 // protects data from garbage collection until it is no longer
397 // possible for clients to retrieve the unreferenced blocks
398 // anyway (because the permission signatures have expired).
399 if fi, err := os.Stat(p); err != nil {
402 if time.Since(fi.ModTime()) < blobSignatureTTL {
407 if trashLifetime == 0 {
410 return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
413 // Untrash moves block from trash back into store
414 // Look for path/{loc}.trash.{deadline} in storage,
415 // and rename the first such file as path/{loc}
416 func (v *UnixVolume) Untrash(loc string) (err error) {
418 return MethodDisabledError
421 files, err := ioutil.ReadDir(v.blockDir(loc))
427 return os.ErrNotExist
431 prefix := fmt.Sprintf("%v.trash.", loc)
432 for _, f := range files {
433 if strings.HasPrefix(f.Name(), prefix) {
435 err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
442 if foundTrash == false {
443 return os.ErrNotExist
449 // blockDir returns the fully qualified directory name for the directory
450 // where loc is (or would be) stored on this volume.
451 func (v *UnixVolume) blockDir(loc string) string {
452 return filepath.Join(v.root, loc[0:3])
455 // blockPath returns the fully qualified pathname for the path to loc
457 func (v *UnixVolume) blockPath(loc string) string {
458 return filepath.Join(v.blockDir(loc), loc)
461 // IsFull returns true if the free space on the volume is less than
464 func (v *UnixVolume) IsFull() (isFull bool) {
465 fullSymlink := v.root + "/full"
467 // Check if the volume has been marked as full in the last hour.
468 if link, err := os.Readlink(fullSymlink); err == nil {
469 if ts, err := strconv.Atoi(link); err == nil {
470 fulltime := time.Unix(int64(ts), 0)
471 if time.Since(fulltime).Hours() < 1.0 {
477 if avail, err := v.FreeDiskSpace(); err == nil {
478 isFull = avail < MinFreeKilobytes
480 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
484 // If the volume is full, timestamp it.
486 now := fmt.Sprintf("%d", time.Now().Unix())
487 os.Symlink(now, fullSymlink)
492 // FreeDiskSpace returns the number of unused 1k blocks available on
495 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
496 var fs syscall.Statfs_t
497 err = syscall.Statfs(v.root, &fs)
499 // Statfs output is not guaranteed to measure free
500 // space in terms of 1K blocks.
501 free = fs.Bavail * uint64(fs.Bsize) / 1024
506 func (v *UnixVolume) String() string {
507 return fmt.Sprintf("[UnixVolume %s]", v.root)
510 // Writable returns false if all future Put, Mtime, and Delete calls are expected to fail.
511 func (v *UnixVolume) Writable() bool {
515 func (v *UnixVolume) Replication() int {
519 // lockfile and unlockfile use flock(2) to manage kernel file locks.
520 func lockfile(f *os.File) error {
521 return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
524 func unlockfile(f *os.File) error {
525 return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
528 // Where appropriate, translate a more specific filesystem error to an
529 // error recognized by handlers, like os.ErrNotExist.
530 func (v *UnixVolume) translateError(err error) error {
533 // stat() returns a PathError if the parent directory
534 // (not just the file itself) is missing
535 return os.ErrNotExist
541 var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
543 // EmptyTrash walks hierarchy looking for {hash}.trash.*
544 // and deletes those with deadline < now.
545 func (v *UnixVolume) EmptyTrash() {
546 var bytesDeleted, bytesInTrash int64
547 var blocksDeleted, blocksInTrash int
549 err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
551 log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
554 if info.Mode().IsDir() {
557 matches := unixTrashLocRegexp.FindStringSubmatch(path)
558 if len(matches) != 3 {
561 deadline, err := strconv.ParseInt(matches[2], 10, 64)
563 log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
566 bytesInTrash += info.Size()
568 if deadline > time.Now().Unix() {
571 err = os.Remove(path)
573 log.Printf("EmptyTrash: Remove %v: %v", path, err)
576 bytesDeleted += info.Size()
582 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
585 log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)