X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/da58ec28659f5167f9658be5714731acee57dfb1..2c6557f613fcf6cdcebb08c321a5d061aeb780c6:/services/keepstore/unix_volume.go diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index dd62cf1319..98edfae14d 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -28,16 +28,22 @@ import ( ) func init() { - driver["Directory"] = newDirectoryVolume + driver["Directory"] = newUnixVolume } -func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &UnixVolume{cluster: cluster, volume: volume, logger: logger, metrics: metrics} - err := json.Unmarshal(volume.DriverParameters, &v) +func newUnixVolume(params newVolumeParams) (volume, error) { + v := &UnixVolume{ + uuid: params.UUID, + cluster: params.Cluster, + volume: params.ConfigVolume, + logger: params.Logger, + metrics: params.MetricsVecs, + } + err := json.Unmarshal(params.ConfigVolume.DriverParameters, &v) if err != nil { return nil, err } - v.logger = v.logger.WithField("Volume", v.String()) + v.logger = v.logger.WithField("Volume", v.DeviceID()) return v, v.check() } @@ -53,7 +59,7 @@ func (v *UnixVolume) check() error { } // Set up prometheus metrics - lbls := prometheus.Labels{"device_id": v.GetDeviceID()} + lbls := prometheus.Labels{"device_id": v.DeviceID()} v.os.stats.opsCounters, v.os.stats.errCounters, v.os.stats.ioBytes = v.metrics.getCounterVecsFor(lbls) _, err := v.os.Stat(v.Root) @@ -65,6 +71,7 @@ type UnixVolume struct { Root string // path to the volume's root directory Serialize bool + uuid string cluster *arvados.Cluster volume arvados.Volume logger logrus.FieldLogger @@ -77,15 +84,16 @@ type UnixVolume struct { os osWithStats } -// GetDeviceID returns a globally unique ID for the volume's root +// DeviceID returns a globally unique ID for the volume's root // directory, consisting of the filesystem's UUID and the path from // filesystem root to storage directory, joined by "/". For example, // the device ID for a local directory "/mnt/xvda1/keep" might be // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep". -func (v *UnixVolume) GetDeviceID() string { +func (v *UnixVolume) DeviceID() string { giveup := func(f string, args ...interface{}) string { - v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...) - return "" + v.logger.Infof(f+"; using hostname:path for volume %s", append(args, v.uuid)...) + host, _ := os.Hostname() + return host + ":" + v.Root } buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput() if err != nil { @@ -154,12 +162,9 @@ func (v *UnixVolume) GetDeviceID() string { return giveup("could not find entry in %q matching %q", udir, dev) } -// Touch sets the timestamp for the given locator to the current time -func (v *UnixVolume) Touch(loc string) error { - if v.volume.ReadOnly { - return MethodDisabledError - } - p := v.blockPath(loc) +// BlockTouch sets the timestamp for the given locator to the current time +func (v *UnixVolume) BlockTouch(hash string) error { + p := v.blockPath(hash) f, err := v.os.OpenFile(p, os.O_RDWR|os.O_APPEND, 0644) if err != nil { return err @@ -203,7 +208,7 @@ func (v *UnixVolume) getFunc(ctx context.Context, path string, fn func(io.Reader return err } defer f.Close() - return fn(NewCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) + return fn(newCountingReader(ioutil.NopCloser(f), v.os.stats.TickInBytes)) } // stat is os.Stat() with some extra sanity checks. @@ -213,72 +218,45 @@ func (v *UnixVolume) stat(path string) (os.FileInfo, error) { if stat.Size() < 0 { err = os.ErrInvalid } else if stat.Size() > BlockSize { - err = TooLongError + err = errTooLarge } } return stat, err } -// Get retrieves a block, copies it to the given slice, and returns -// the number of bytes copied. -func (v *UnixVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) { - return getWithPipe(ctx, loc, buf, v) -} - -// ReadBlock implements BlockReader. -func (v *UnixVolume) ReadBlock(ctx context.Context, loc string, w io.Writer) error { - path := v.blockPath(loc) +// BlockRead reads a block from the volume. +func (v *UnixVolume) BlockRead(ctx context.Context, hash string, w io.Writer) (int, error) { + path := v.blockPath(hash) stat, err := v.stat(path) if err != nil { - return v.translateError(err) + return 0, v.translateError(err) } - return v.getFunc(ctx, path, func(rdr io.Reader) error { - n, err := io.Copy(w, rdr) + var n int64 + err = v.getFunc(ctx, path, func(rdr io.Reader) error { + n, err = io.Copy(w, rdr) if err == nil && n != stat.Size() { err = io.ErrUnexpectedEOF } return err }) + return int(n), err } -// Compare returns nil if Get(loc) would return the same content as -// expect. It is functionally equivalent to Get() followed by -// bytes.Compare(), but uses less memory. -func (v *UnixVolume) Compare(ctx context.Context, loc string, expect []byte) error { - path := v.blockPath(loc) - if _, err := v.stat(path); err != nil { - return v.translateError(err) - } - return v.getFunc(ctx, path, func(rdr io.Reader) error { - return compareReaderWithBuf(ctx, rdr, expect, loc[:32]) - }) -} - -// Put stores a block of data identified by the locator string -// "loc". It returns nil on success. If the volume is full, it -// returns a FullError. If the write fails due to some other error, -// that error is returned. -func (v *UnixVolume) Put(ctx context.Context, loc string, block []byte) error { - return putWithPipe(ctx, loc, block, v) -} - -// WriteBlock implements BlockWriter. -func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) error { - if v.volume.ReadOnly { - return MethodDisabledError +// BlockWrite stores a block on the volume. If it already exists, its +// timestamp is updated. +func (v *UnixVolume) BlockWrite(ctx context.Context, hash string, data []byte) error { + if v.isFull() { + return errFull } - if v.IsFull() { - return FullError - } - bdir := v.blockDir(loc) + bdir := v.blockDir(hash) if err := os.MkdirAll(bdir, 0755); err != nil { return fmt.Errorf("error creating directory %s: %s", bdir, err) } - bpath := v.blockPath(loc) - tmpfile, err := v.os.TempFile(bdir, "tmp"+loc) + bpath := v.blockPath(hash) + tmpfile, err := v.os.TempFile(bdir, "tmp"+hash) if err != nil { - return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, err) + return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, hash, err) } defer v.os.Remove(tmpfile.Name()) defer tmpfile.Close() @@ -287,7 +265,7 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) return err } defer v.unlock() - n, err := io.Copy(tmpfile, rdr) + n, err := tmpfile.Write(data) v.os.stats.TickOutBytes(uint64(n)) if err != nil { return fmt.Errorf("error writing %s: %s", bpath, err) @@ -312,58 +290,10 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) return nil } -// Status returns a VolumeStatus struct describing the volume's -// current state, or nil if an error occurs. -// -func (v *UnixVolume) Status() *VolumeStatus { - fi, err := v.os.Stat(v.Root) - if err != nil { - v.logger.WithError(err).Error("stat failed") - return nil - } - // uint64() cast here supports GOOS=darwin where Dev is - // int32. If the device number is negative, the unsigned - // devnum won't be the real device number any more, but that's - // fine -- all we care about is getting the same number each - // time. - devnum := uint64(fi.Sys().(*syscall.Stat_t).Dev) - - var fs syscall.Statfs_t - if err := syscall.Statfs(v.Root, &fs); err != nil { - v.logger.WithError(err).Error("statfs failed") - return nil - } - // These calculations match the way df calculates disk usage: - // "free" space is measured by fs.Bavail, but "used" space - // uses fs.Blocks - fs.Bfree. - free := fs.Bavail * uint64(fs.Bsize) - used := (fs.Blocks - fs.Bfree) * uint64(fs.Bsize) - return &VolumeStatus{ - MountPoint: v.Root, - DeviceNum: devnum, - BytesFree: free, - BytesUsed: used, - } -} - var blockDirRe = regexp.MustCompile(`^[0-9a-f]+$`) var blockFileRe = regexp.MustCompile(`^[0-9a-f]{32}$`) -// IndexTo writes (to the given Writer) a list of blocks found on this -// volume which begin with the specified prefix. If the prefix is an -// empty string, IndexTo writes a complete list of blocks. -// -// Each block is given in the format -// -// locator+size modification-time {newline} -// -// e.g.: -// -// e4df392f86be161ca6ed3773a962b8f3+67108864 1388894303 -// e4d41e6fd68460e0e3fc18cc746959d2+67108864 1377796043 -// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136 -// -func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { +func (v *UnixVolume) Index(ctx context.Context, prefix string, w io.Writer) error { rootdir, err := v.os.Open(v.Root) if err != nil { return err @@ -376,6 +306,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { return err } for _, subdir := range subdirs { + if ctx.Err() != nil { + return ctx.Err() + } if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) { // prefix excludes all blocks stored in this dir continue @@ -390,7 +323,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) dirents, err = os.ReadDir(blockdirpath) - if err == nil { + if ctx.Err() != nil { + return ctx.Err() + } else if err == nil { break } else if attempt < 5 && strings.Contains(err.Error(), "errno 523") { // EBADCOOKIE (NFS stopped accepting @@ -404,6 +339,9 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { } for _, dirent := range dirents { + if ctx.Err() != nil { + return ctx.Err() + } fileInfo, err := dirent.Info() if os.IsNotExist(err) { // File disappeared between ReadDir() and now @@ -432,11 +370,11 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { return nil } -// Trash trashes the block data from the unix storage -// If BlobTrashLifetime == 0, the block is deleted -// Else, the block is renamed as path/{loc}.trash.{deadline}, -// where deadline = now + BlobTrashLifetime -func (v *UnixVolume) Trash(loc string) error { +// BlockTrash trashes the block data from the unix storage. If +// BlobTrashLifetime == 0, the block is deleted; otherwise, the block +// is renamed as path/{loc}.trash.{deadline}, where deadline = now + +// BlobTrashLifetime. +func (v *UnixVolume) BlockTrash(loc string) error { // Touch() must be called before calling Write() on a block. Touch() // also uses lockfile(). This avoids a race condition between Write() // and Trash() because either (a) the file will be trashed and Touch() @@ -444,10 +382,6 @@ func (v *UnixVolume) Trash(loc string) error { // be re-written), or (b) Touch() will update the file's timestamp and // Trash() will read the correct up-to-date timestamp and choose not to // trash the file. - - if v.volume.ReadOnly || !v.cluster.Collections.BlobTrash { - return MethodDisabledError - } if err := v.lock(context.TODO()); err != nil { return err } @@ -480,17 +414,13 @@ func (v *UnixVolume) Trash(loc string) error { return v.os.Rename(p, fmt.Sprintf("%v.trash.%d", p, time.Now().Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Unix())) } -// Untrash moves block from trash back into store +// BlockUntrash moves block from trash back into store // Look for path/{loc}.trash.{deadline} in storage, // and rename the first such file as path/{loc} -func (v *UnixVolume) Untrash(loc string) (err error) { - if v.volume.ReadOnly { - return MethodDisabledError - } - +func (v *UnixVolume) BlockUntrash(hash string) error { v.os.stats.TickOps("readdir") v.os.stats.Tick(&v.os.stats.ReaddirOps) - files, err := ioutil.ReadDir(v.blockDir(loc)) + files, err := ioutil.ReadDir(v.blockDir(hash)) if err != nil { return err } @@ -500,11 +430,11 @@ func (v *UnixVolume) Untrash(loc string) (err error) { } foundTrash := false - prefix := fmt.Sprintf("%v.trash.", loc) + prefix := fmt.Sprintf("%v.trash.", hash) for _, f := range files { if strings.HasPrefix(f.Name(), prefix) { foundTrash = true - err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(loc)) + err = v.os.Rename(v.blockPath(f.Name()), v.blockPath(hash)) if err == nil { break } @@ -515,7 +445,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) { return os.ErrNotExist } - return + return nil } // blockDir returns the fully qualified directory name for the directory @@ -530,10 +460,9 @@ func (v *UnixVolume) blockPath(loc string) string { return filepath.Join(v.blockDir(loc), loc) } -// IsFull returns true if the free space on the volume is less than +// isFull returns true if the free space on the volume is less than // MinFreeKilobytes. -// -func (v *UnixVolume) IsFull() (isFull bool) { +func (v *UnixVolume) isFull() (isFull bool) { fullSymlink := v.Root + "/full" // Check if the volume has been marked as full in the last hour. @@ -547,9 +476,9 @@ func (v *UnixVolume) IsFull() (isFull bool) { } if avail, err := v.FreeDiskSpace(); err == nil { - isFull = avail < MinFreeKilobytes + isFull = avail < BlockSize } else { - v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v) + v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v.DeviceID()) isFull = false } @@ -563,22 +492,17 @@ func (v *UnixVolume) IsFull() (isFull bool) { // FreeDiskSpace returns the number of unused 1k blocks available on // the volume. -// func (v *UnixVolume) FreeDiskSpace() (free uint64, err error) { var fs syscall.Statfs_t err = syscall.Statfs(v.Root, &fs) if err == nil { // Statfs output is not guaranteed to measure free // space in terms of 1K blocks. - free = fs.Bavail * uint64(fs.Bsize) / 1024 + free = fs.Bavail * uint64(fs.Bsize) } return } -func (v *UnixVolume) String() string { - return fmt.Sprintf("[UnixVolume %s]", v.Root) -} - // InternalStats returns I/O and filesystem ops counters. func (v *UnixVolume) InternalStats() interface{} { return &v.os.stats @@ -651,10 +575,6 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`) // EmptyTrash walks hierarchy looking for {hash}.trash.* // and deletes those with deadline < now. func (v *UnixVolume) EmptyTrash() { - if v.cluster.Collections.BlobDeleteConcurrency < 1 { - return - } - var bytesDeleted, bytesInTrash int64 var blocksDeleted, blocksInTrash int64