21 type unixVolumeAdder struct {
25 func (vs *unixVolumeAdder) Set(value string) error {
26 if dirs := strings.Split(value, ","); len(dirs) > 1 {
27 log.Print("DEPRECATED: using comma-separated volume list.")
28 for _, dir := range dirs {
29 if err := vs.Set(dir); err != nil {
35 if len(value) == 0 || value[0] != '/' {
36 return errors.New("Invalid volume: must begin with '/'.")
38 if _, err := os.Stat(value); err != nil {
41 var locker sync.Locker
43 locker = &sync.Mutex{}
45 *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
48 readonly: flagReadonly,
55 &unixVolumeAdder{&volumes},
57 "Deprecated synonym for -volume.")
59 &unixVolumeAdder{&volumes},
61 "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
64 // Discover adds a UnixVolume for every directory named "keep" that is
65 // located at the top level of a device- or tmpfs-backed mount point
66 // other than "/". It returns the number of volumes added.
67 func (vs *unixVolumeAdder) Discover() int {
69 f, err := os.Open(ProcMounts)
71 log.Fatalf("opening %s: %s", ProcMounts, err)
73 scanner := bufio.NewScanner(f)
75 args := strings.Fields(scanner.Text())
76 if err := scanner.Err(); err != nil {
77 log.Fatalf("reading %s: %s", ProcMounts, err)
79 dev, mount := args[0], args[1]
83 if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
86 keepdir := mount + "/keep"
87 if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
90 // Set the -readonly flag (but only for this volume)
91 // if the filesystem is mounted readonly.
92 flagReadonlyWas := flagReadonly
93 for _, fsopt := range strings.Split(args[3], ",") {
102 if err := vs.Set(keepdir); err != nil {
103 log.Printf("adding %q: %s", keepdir, err)
107 flagReadonly = flagReadonlyWas
112 // A UnixVolume stores and retrieves blocks in a local directory.
113 type UnixVolume struct {
114 // path to the volume's root directory
116 // something to lock during IO, typically a sync.Mutex (or nil
122 // Touch sets the timestamp for the given locator to the current time
123 func (v *UnixVolume) Touch(loc string) error {
125 return MethodDisabledError
127 p := v.blockPath(loc)
128 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
135 defer v.locker.Unlock()
137 if e := lockfile(f); e != nil {
141 ts := syscall.NsecToTimespec(time.Now().UnixNano())
142 return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
145 // Mtime returns the stored timestamp for the given locator.
146 func (v *UnixVolume) Mtime(loc string) (time.Time, error) {
147 p := v.blockPath(loc)
148 fi, err := os.Stat(p)
150 return time.Time{}, err
152 return fi.ModTime(), nil
155 // Lock the locker (if one is in use), open the file for reading, and
156 // call the given function if and when the file is ready to read.
157 func (v *UnixVolume) getFunc(path string, fn func(io.Reader) error) error {
160 defer v.locker.Unlock()
162 f, err := os.Open(path)
170 // stat is os.Stat() with some extra sanity checks.
171 func (v *UnixVolume) stat(path string) (os.FileInfo, error) {
172 stat, err := os.Stat(path)
176 } else if stat.Size() > BlockSize {
183 // Get retrieves a block, copies it to the given slice, and returns
184 // the number of bytes copied.
185 func (v *UnixVolume) Get(loc string, buf []byte) (int, error) {
186 path := v.blockPath(loc)
187 stat, err := v.stat(path)
189 return 0, v.translateError(err)
191 if stat.Size() > int64(len(buf)) {
192 return 0, TooLongError
195 size := int(stat.Size())
196 err = v.getFunc(path, func(rdr io.Reader) error {
197 read, err = io.ReadFull(rdr, buf[:size])
203 // Compare returns nil if Get(loc) would return the same content as
204 // expect. It is functionally equivalent to Get() followed by
205 // bytes.Compare(), but uses less memory.
206 func (v *UnixVolume) Compare(loc string, expect []byte) error {
207 path := v.blockPath(loc)
208 if _, err := v.stat(path); err != nil {
209 return v.translateError(err)
211 return v.getFunc(path, func(rdr io.Reader) error {
212 return compareReaderWithBuf(rdr, expect, loc[:32])
216 // Put stores a block of data identified by the locator string
217 // "loc". It returns nil on success. If the volume is full, it
218 // returns a FullError. If the write fails due to some other error,
219 // that error is returned.
220 func (v *UnixVolume) Put(loc string, block []byte) error {
222 return MethodDisabledError
227 bdir := v.blockDir(loc)
228 if err := os.MkdirAll(bdir, 0755); err != nil {
229 log.Printf("%s: could not create directory %s: %s",
234 tmpfile, tmperr := ioutil.TempFile(bdir, "tmp"+loc)
236 log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
239 bpath := v.blockPath(loc)
243 defer v.locker.Unlock()
245 if _, err := tmpfile.Write(block); err != nil {
246 log.Printf("%s: writing to %s: %s\n", v, bpath, err)
248 os.Remove(tmpfile.Name())
251 if err := tmpfile.Close(); err != nil {
252 log.Printf("closing %s: %s\n", tmpfile.Name(), err)
253 os.Remove(tmpfile.Name())
256 if err := os.Rename(tmpfile.Name(), bpath); err != nil {
257 log.Printf("rename %s %s: %s\n", tmpfile.Name(), bpath, err)
258 os.Remove(tmpfile.Name())
264 // Status returns a VolumeStatus struct describing the volume's
265 // current state, or nil if an error occurs.
267 func (v *UnixVolume) Status() *VolumeStatus {
268 var fs syscall.Statfs_t
271 if fi, err := os.Stat(v.root); err == nil {
272 devnum = fi.Sys().(*syscall.Stat_t).Dev
274 log.Printf("%s: os.Stat: %s\n", v, err)
278 err := syscall.Statfs(v.root, &fs)
280 log.Printf("%s: statfs: %s\n", v, err)
283 // These calculations match the way df calculates disk usage:
284 // "free" space is measured by fs.Bavail, but "used" space
285 // uses fs.Blocks - fs.Bfree.
286 free := fs.Bavail * uint64(fs.Bsize)
287 used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
288 return &VolumeStatus{v.root, devnum, free, used}
291 var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
292 var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
294 // IndexTo writes (to the given Writer) a list of blocks found on this
295 // volume which begin with the specified prefix. If the prefix is an
296 // empty string, IndexTo writes a complete list of blocks.
298 // Each block is given in the format
300 // locator+size modification-time {newline}
304 // e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
305 // e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
306 // e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
308 func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
310 rootdir, err := os.Open(v.root)
314 defer rootdir.Close()
316 names, err := rootdir.Readdirnames(1)
319 } else if err != nil {
322 if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
323 // prefix excludes all blocks stored in this dir
326 if !blockDirRe.MatchString(names[0]) {
329 blockdirpath := filepath.Join(v.root, names[0])
330 blockdir, err := os.Open(blockdirpath)
332 log.Print("Error reading ", blockdirpath, ": ", err)
337 fileInfo, err := blockdir.Readdir(1)
340 } else if err != nil {
341 log.Print("Error reading ", blockdirpath, ": ", err)
345 name := fileInfo[0].Name()
346 if !strings.HasPrefix(name, prefix) {
349 if !blockFileRe.MatchString(name) {
352 _, err = fmt.Fprint(w,
354 "+", fileInfo[0].Size(),
355 " ", fileInfo[0].ModTime().UnixNano(),
362 // Trash trashes the block data from the unix storage
363 // If trashLifetime == 0, the block is deleted
364 // Else, the block is renamed as path/{loc}.trash.{deadline},
365 // where deadline = now + trashLifetime
366 func (v *UnixVolume) Trash(loc string) error {
367 // Touch() must be called before calling Write() on a block. Touch()
368 // also uses lockfile(). This avoids a race condition between Write()
369 // and Trash() because either (a) the file will be trashed and Touch()
370 // will signal to the caller that the file is not present (and needs to
371 // be re-written), or (b) Touch() will update the file's timestamp and
372 // Trash() will read the correct up-to-date timestamp and choose not to
376 return MethodDisabledError
380 defer v.locker.Unlock()
382 p := v.blockPath(loc)
383 f, err := os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
388 if e := lockfile(f); e != nil {
393 // If the block has been PUT in the last blobSignatureTTL
394 // seconds, return success without removing the block. This
395 // protects data from garbage collection until it is no longer
396 // possible for clients to retrieve the unreferenced blocks
397 // anyway (because the permission signatures have expired).
398 if fi, err := os.Stat(p); err != nil {
400 } else if time.Since(fi.ModTime()) < blobSignatureTTL {
404 if trashLifetime == 0 {
407 return os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(trashLifetime).Unix()))
410 // Untrash moves block from trash back into store
411 // Look for path/{loc}.trash.{deadline} in storage,
412 // and rename the first such file as path/{loc}
413 func (v *UnixVolume) Untrash(loc string) (err error) {
415 return MethodDisabledError
418 files, err := ioutil.ReadDir(v.blockDir(loc))
424 return os.ErrNotExist
428 prefix := fmt.Sprintf("%v.trash.", loc)
429 for _, f := range files {
430 if strings.HasPrefix(f.Name(), prefix) {
432 err = os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
439 if foundTrash == false {
440 return os.ErrNotExist
446 // blockDir returns the fully qualified directory name for the directory
447 // where loc is (or would be) stored on this volume.
448 func (v *UnixVolume) blockDir(loc string) string {
449 return filepath.Join(v.root, loc[0:3])
452 // blockPath returns the fully qualified pathname for the path to loc
454 func (v *UnixVolume) blockPath(loc string) string {
455 return filepath.Join(v.blockDir(loc), loc)
458 // IsFull returns true if the free space on the volume is less than
461 func (v *UnixVolume) IsFull() (isFull bool) {
462 fullSymlink := v.root + "/full"
464 // Check if the volume has been marked as full in the last hour.
465 if link, err := os.Readlink(fullSymlink); err == nil {
466 if ts, err := strconv.Atoi(link); err == nil {
467 fulltime := time.Unix(int64(ts), 0)
468 if time.Since(fulltime).Hours() < 1.0 {
474 if avail, err := v.FreeDiskSpace(); err == nil {
475 isFull = avail < MinFreeKilobytes
477 log.Printf("%s: FreeDiskSpace: %s\n", v, err)
481 // If the volume is full, timestamp it.
483 now := fmt.Sprintf("%d", time.Now().Unix())
484 os.Symlink(now, fullSymlink)
489 // FreeDiskSpace returns the number of unused 1k blocks available on
492 func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) {
493 var fs syscall.Statfs_t
494 err = syscall.Statfs(v.root, &fs)
496 // Statfs output is not guaranteed to measure free
497 // space in terms of 1K blocks.
498 free = fs.Bavail * uint64(fs.Bsize) / 1024
503 func (v *UnixVolume) String() string {
504 return fmt.Sprintf("[UnixVolume %s]", v.root)
507 // Writable returns false if all future Put, Mtime, and Delete calls
508 // are expected to fail.
509 func (v *UnixVolume) Writable() bool {
513 // Replication returns the number of replicas promised by the
514 // underlying device (currently assumed to be 1).
515 func (v *UnixVolume) Replication() int {
519 // lockfile and unlockfile use flock(2) to manage kernel file locks.
520 func lockfile(f *os.File) error {
521 return syscall.Flock(int(f.Fd()), syscall.LOCK_EX)
524 func unlockfile(f *os.File) error {
525 return syscall.Flock(int(f.Fd()), syscall.LOCK_UN)
528 // Where appropriate, translate a more specific filesystem error to an
529 // error recognized by handlers, like os.ErrNotExist.
530 func (v *UnixVolume) translateError(err error) error {
533 // stat() returns a PathError if the parent directory
534 // (not just the file itself) is missing
535 return os.ErrNotExist
541 var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`)
543 // EmptyTrash walks hierarchy looking for {hash}.trash.*
544 // and deletes those with deadline < now.
545 func (v *UnixVolume) EmptyTrash() {
546 var bytesDeleted, bytesInTrash int64
547 var blocksDeleted, blocksInTrash int
549 err := filepath.Walk(v.root, func(path string, info os.FileInfo, err error) error {
551 log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
554 if info.Mode().IsDir() {
557 matches := unixTrashLocRegexp.FindStringSubmatch(path)
558 if len(matches) != 3 {
561 deadline, err := strconv.ParseInt(matches[2], 10, 64)
563 log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
566 bytesInTrash += info.Size()
568 if deadline > time.Now().Unix() {
571 err = os.Remove(path)
573 log.Printf("EmptyTrash: Remove %v: %v", path, err)
576 bytesDeleted += info.Size()
582 log.Printf("EmptyTrash error for %v: %v", v.String(), err)
585 log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)