)
func init() {
- driver["Directory"] = newDirectoryVolume
+ driver["Directory"] = newUnixVolume
}
-func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
- v := &UnixVolume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
- err := json.Unmarshal(volume.DriverParameters, &v)
+func newUnixVolume(params newVolumeParams) (volume, error) {
+ v := &UnixVolume{
+ uuid: params.UUID,
+ cluster: params.Cluster,
+ volume: params.ConfigVolume,
+ logger: params.Logger,
+ metrics: params.MetricsVecs,
+ }
+ err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v)
if err != nil {
return nil, err
}
- v.logger = v.logger.WithField("Volume", v.String())
+ v.logger = v.logger.WithField("Volume", v.DeviceID())
return v, v.check()
}
}
// Set up prometheus metrics
- lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
+ lbls := prometheus.Labels{"device_id": v.DeviceID()}
v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
_, err := v.os.Stat(v.Root)
Root string // path to the volume's root directory
Serialize bool
+ uuid string
cluster *arvados.Cluster
volume arvados.Volume
logger logrus.FieldLogger
os osWithStats
}
-// GetDeviceID returns a globally unique ID for the volume's root
+// DeviceID returns a globally unique ID for the volume's root
// directory, consisting of the filesystem's UUID and the path from
// filesystem root to storage directory, joined by "/". For example,
// the device ID for a local directory "/mnt/xvda1/keep" might be
// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
-func (v *UnixVolume) GetDeviceID() string {
+func (v *UnixVolume) DeviceID() string {
giveup := func(f string, args ...interface{}) string {
- v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
- return ""
+ v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...)
+ host, _ := os.Hostname()
+ return host + ":" + v.Root
}
buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
if err != nil {
return giveup("could not find entry in %q matching %q", udir, dev)
}
-// Touch sets the timestamp for the given locator to the current time
-func (v *UnixVolume) Touch(loc string) error {
- if v.volume.ReadOnly {
- return MethodDisabledError
- }
- p := v.blockPath(loc)
+// BlockTouch sets the timestamp for the given locator to the current time
+func (v *UnixVolume) BlockTouch(hash string) error {
+ p := v.blockPath(hash)
f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644)
if err != nil {
return err
return err
}
defer f.Close()
- return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
+ return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes))
}
// stat is os.Stat() with some extra sanity checks.
if stat.Size() < 0 {
err = os.ErrInvalid
} else if stat.Size() > BlockSize {
- err = TooLongError
+ err = errTooLarge
}
}
return stat, err
}
-// Get retrieves a block, copies it to the given slice, and returns
-// the number of bytes copied.
-func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
- return getWithPipe(ctx, loc, buf, v)
-}
-
-// ReadBlock implements BlockReader.
-func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error {
- path := v.blockPath(loc)
+// BlockRead reads a block from the volume.
+func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) {
+ path := v.blockPath(hash)
stat, err := v.stat(path)
if err != nil {
- return v.translateError(err)
+ return 0, v.translateError(err)
}
- return v.getFunc(ctx, path, func(rdr io.Reader) error {
- n, err := io.Copy(w, rdr)
+ var n int64
+ err = v.getFunc(ctx, path, func(rdr io.Reader) error {
+ n, err = io.Copy(w, rdr)
if err == nil && n != stat.Size() {
err = io.ErrUnexpectedEOF
}
return err
})
+ return int(n), err
}
-// Compare returns nil if Get(loc) would return the same content as
-// expect. It is functionally equivalent to Get() followed by
-// bytes.Compare(), but uses less memory.
-func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error {
- path := v.blockPath(loc)
- if _, err := v.stat(path); err != nil {
- return v.translateError(err)
- }
- return v.getFunc(ctx, path, func(rdr io.Reader) error {
- return compareReaderWithBuf(ctx, rdr, expect, loc[:32])
- })
-}
-
-// Put stores a block of data identified by the locator string
-// "loc". It returns nil on success. If the volume is full, it
-// returns a FullError. If the write fails due to some other error,
-// that error is returned.
-func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error {
- return putWithPipe(ctx, loc, block, v)
-}
-
-// WriteBlock implements BlockWriter.
-func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error {
- if v.volume.ReadOnly {
- return MethodDisabledError
- }
- if v.IsFull() {
- return FullError
+// BlockWrite stores a block on the volume. If it already exists, its
+// timestamp is updated.
+func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
+ if v.isFull() {
+ return errFull
}
- bdir := v.blockDir(loc)
+ bdir := v.blockDir(hash)
if err := os.MkdirAll(bdir, 0755); err != nil {
return fmt.Errorf("error creating directory %s: %s", bdir, err)
}
- bpath := v.blockPath(loc)
- tmpfile, err := v.os.TempFile(bdir, "tmp"+loc)
+ bpath := v.blockPath(hash)
+ tmpfile, err := v.os.TempFile(bdir, "tmp"+hash)
if err != nil {
- return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, err)
+ return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, hash, err)
}
defer v.os.Remove(tmpfile.Name())
defer tmpfile.Close()
return err
}
defer v.unlock()
- n, err := io.Copy(tmpfile, rdr)
+ n, err := tmpfile.Write(data)
v.os.stats.TickOutBytes(uint64(n))
if err != nil {
return fmt.Errorf("error writing %s: %s", bpath, err)
return nil
}
-// Status returns a VolumeStatus struct describing the volume's
-// current state, or nil if an error occurs.
-func (v *UnixVolume) Status() *VolumeStatus {
- fi, err := v.os.Stat(v.Root)
- if err != nil {
- v.logger.WithError(err).Error("stat failed")
- return nil
- }
- // uint64() cast here supports GOOS=darwin where Dev is
- // int32. If the device number is negative, the unsigned
- // devnum won't be the real device number any more, but that's
- // fine -- all we care about is getting the same number each
- // time.
- devnum := uint64(fi.Sys().(*syscall.Stat_t).Dev)
-
- var fs syscall.Statfs_t
- if err := syscall.Statfs(v.Root, &fs); err != nil {
- v.logger.WithError(err).Error("statfs failed")
- return nil
- }
- // These calculations match the way df calculates disk usage:
- // "free" space is measured by fs.Bavail, but "used" space
- // uses fs.Blocks - fs.Bfree.
- free := fs.Bavail * uint64(fs.Bsize)
- used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize)
- return &VolumeStatus{
- MountPoint: v.Root,
- DeviceNum: devnum,
- BytesFree: free,
- BytesUsed: used,
- }
-}
-
var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`)
var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
-// IndexTo writes (to the given Writer) a list of blocks found on this
-// volume which begin with the specified prefix. If the prefix is an
-// empty string, IndexTo writes a complete list of blocks.
-//
-// Each block is given in the format
-//
-// locator+size modification-time {newline}
-//
-// e.g.:
-//
-// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303
-// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043
-// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
-func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
+func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error {
rootdir, err := v.os.Open(v.Root)
if err != nil {
return err
return err
}
for _, subdir := range subdirs {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) {
// prefix excludes all blocks stored in this dir
continue
v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
dirents, err = os.ReadDir(blockdirpath)
- if err == nil {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ } else if err == nil {
break
} else if attempt < 5 && strings.Contains(err.Error(), "errno 523") {
// EBADCOOKIE (NFS stopped accepting
}
for _, dirent := range dirents {
+ if ctx.Err() != nil {
+ return ctx.Err()
+ }
fileInfo, err := dirent.Info()
if os.IsNotExist(err) {
// File disappeared between ReadDir() and now
return nil
}
-// Trash trashes the block data from the unix storage
-// If BlobTrashLifetime == 0, the block is deleted
-// Else, the block is renamed as path/{loc}.trash.{deadline},
-// where deadline = now + BlobTrashLifetime
-func (v *UnixVolume) Trash(loc string) error {
+// BlockTrash trashes the block data from the unix storage. If
+// BlobTrashLifetime == 0, the block is deleted; otherwise, the block
+// is renamed as path/{loc}.trash.{deadline}, where deadline = now +
+// BlobTrashLifetime.
+func (v *UnixVolume) BlockTrash(loc string) error {
// Touch() must be called before calling Write() on a block. Touch()
// also uses lockfile(). This avoids a race condition between Write()
// and Trash() because either (a) the file will be trashed and Touch()
// be re-written), or (b) Touch() will update the file's timestamp and
// Trash() will read the correct up-to-date timestamp and choose not to
// trash the file.
- if v.volume.ReadOnly && !v.volume.AllowTrashWhenReadOnly {
- return MethodDisabledError
- }
if err := v.lock(context.TODO()); err != nil {
return err
}
return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix()))
}
-// Untrash moves block from trash back into store
+// BlockUntrash moves block from trash back into store
// Look for path/{loc}.trash.{deadline} in storage,
// and rename the first such file as path/{loc}
-func (v *UnixVolume) Untrash(loc string) (err error) {
- if v.volume.ReadOnly {
- return MethodDisabledError
- }
-
+func (v *UnixVolume) BlockUntrash(hash string) error {
v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
- files, err := ioutil.ReadDir(v.blockDir(loc))
+ files, err := ioutil.ReadDir(v.blockDir(hash))
if err != nil {
return err
}
}
foundTrash := false
- prefix := fmt.Sprintf("%v.trash.", loc)
+ prefix := fmt.Sprintf("%v.trash.", hash)
for _, f := range files {
if strings.HasPrefix(f.Name(), prefix) {
foundTrash = true
- err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc))
+ err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(hash))
if err == nil {
break
}
return os.ErrNotExist
}
- return
+ return nil
}
// blockDir returns the fully qualified directory name for the directory
return filepath.Join(v.blockDir(loc), loc)
}
-// IsFull returns true if the free space on the volume is less than
+// isFull returns true if the free space on the volume is less than
// MinFreeKilobytes.
-func (v *UnixVolume) IsFull() (isFull bool) {
+func (v *UnixVolume) isFull() (isFull bool) {
fullSymlink := v.Root + "/full"
// Check if the volume has been marked as full in the last hour.
}
if avail, err := v.FreeDiskSpace(); err == nil {
- isFull = avail < MinFreeKilobytes
+ isFull = avail < BlockSize
} else {
- v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
+ v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v.DeviceID())
isFull = false
}
if err == nil {
// Statfs output is not guaranteed to measure free
// space in terms of 1K blocks.
- free = fs.Bavail * uint64(fs.Bsize) / 1024
+ free = fs.Bavail * uint64(fs.Bsize)
}
return
}
-func (v *UnixVolume) String() string {
- return fmt.Sprintf("[UnixVolume %s]", v.Root)
-}
-
// InternalStats returns I/O and filesystem ops counters.
func (v *UnixVolume) InternalStats() interface{} {
return &v.os.stats