X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/837949653b69e357cfa90fb0b8855a37e9c406d7..dcb4760843cc0ed4647e8eaa43abb5d2f049cd0c:/services/keepstore/volume_unix.go diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go index 681095d3a2..23d6753592 100644 --- a/services/keepstore/volume_unix.go +++ b/services/keepstore/volume_unix.go @@ -1,3 +1,7 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package main import ( @@ -8,15 +12,15 @@ import ( "io" "io/ioutil" "os" + "os/exec" "path/filepath" "regexp" "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" - - log "github.com/Sirupsen/logrus" ) type unixVolumeAdder struct { @@ -107,6 +111,7 @@ type UnixVolume struct { ReadOnly bool Serialize bool DirectoryReplication int + StorageClasses []string // something to lock during IO, typically a sync.Mutex (or nil // to skip locking) @@ -115,6 +120,82 @@ type UnixVolume struct { os osWithStats } +// DeviceID returns a globally unique ID for the volume's root +// directory, consisting of the filesystem's UUID and the path from +// filesystem root to storage directory, joined by "/". For example, +// the DeviceID for a local directory "/mnt/xvda1/keep" might be +// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep". +func (v *UnixVolume) DeviceID() string { + giveup := func(f string, args ...interface{}) string { + log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...) + return "" + } + buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput() + if err != nil { + return giveup("findmnt: %s (%q)", err, buf) + } + findmnt := strings.Fields(string(buf)) + if len(findmnt) < 2 { + return giveup("could not parse findmnt output: %q", buf) + } + fsRoot, dev := findmnt[0], findmnt[1] + + absRoot, err := filepath.Abs(v.Root) + if err != nil { + return giveup("resolving relative path %q: %s", v.Root, err) + } + realRoot, err := filepath.EvalSymlinks(absRoot) + if err != nil { + return giveup("resolving symlinks in %q: %s", absRoot, err) + } + + // Find path from filesystem root to realRoot + var fsPath string + if strings.HasPrefix(realRoot, fsRoot+"/") { + fsPath = realRoot[len(fsRoot):] + } else if fsRoot == "/" { + fsPath = realRoot + } else if fsRoot == realRoot { + fsPath = "" + } else { + return giveup("findmnt reports mount point %q which is not a prefix of volume root %q", fsRoot, realRoot) + } + + if !strings.HasPrefix(dev, "/") { + return giveup("mount %q device %q is not a path", fsRoot, dev) + } + + fi, err := os.Stat(dev) + if err != nil { + return giveup("stat %q: %s\n", dev, err) + } + ino := fi.Sys().(*syscall.Stat_t).Ino + + // Find a symlink in /dev/disk/by-uuid/ whose target is (i.e., + // has the same inode as) the mounted device + udir := "/dev/disk/by-uuid" + d, err := os.Open(udir) + if err != nil { + return giveup("opening %q: %s", udir, err) + } + uuids, err := d.Readdirnames(0) + if err != nil { + return giveup("reading %q: %s", udir, err) + } + for _, uuid := range uuids { + link := filepath.Join(udir, uuid) + fi, err = os.Stat(link) + if err != nil { + log.Printf("error: stat %q: %s", link, err) + continue + } + if fi.Sys().(*syscall.Stat_t).Ino == ino { + return uuid + fsPath + } + } + return giveup("could not find entry in %q matching %q", udir, dev) +} + // Examples implements VolumeWithExamples. func (*UnixVolume) Examples() []Volume { return []Volume{ @@ -358,6 +439,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { return err } defer rootdir.Close() + v.os.stats.Tick(&v.os.stats.ReaddirOps) for { names, err := rootdir.Readdirnames(1) if err == io.EOF { @@ -379,6 +461,7 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { lastErr = err continue } + v.os.stats.Tick(&v.os.stats.ReaddirOps) for { fileInfo, err := blockdir.Readdir(1) if err == io.EOF { @@ -400,6 +483,11 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { "+", fileInfo[0].Size(), " ", fileInfo[0].ModTime().UnixNano(), "\n") + if err != nil { + log.Print("Error writing : ", err) + lastErr = err + break + } } blockdir.Close() } @@ -461,6 +549,7 @@ func (v *UnixVolume) Untrash(loc string) (err error) { return MethodDisabledError } + v.os.stats.Tick(&v.os.stats.ReaddirOps) files, err := ioutil.ReadDir(v.blockDir(loc)) if err != nil { return err @@ -562,6 +651,11 @@ func (v *UnixVolume) Replication() int { return v.DirectoryReplication } +// GetStorageClasses implements Volume +func (v *UnixVolume) GetStorageClasses() []string { + return v.StorageClasses +} + // InternalStats returns I/O and filesystem ops counters. func (v *UnixVolume) InternalStats() interface{} { return &v.os.stats @@ -632,39 +726,61 @@ var unixTrashLocRegexp = regexp.MustCompile(`/([0-9a-f]{32})\.trash\.(\d+)$`) // and deletes those with deadline < now. func (v *UnixVolume) EmptyTrash() { var bytesDeleted, bytesInTrash int64 - var blocksDeleted, blocksInTrash int + var blocksDeleted, blocksInTrash int64 - err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error { - if err != nil { - log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err) - return nil - } + doFile := func(path string, info os.FileInfo) { if info.Mode().IsDir() { - return nil + return } matches := unixTrashLocRegexp.FindStringSubmatch(path) if len(matches) != 3 { - return nil + return } deadline, err := strconv.ParseInt(matches[2], 10, 64) if err != nil { log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err) - return nil + return } - bytesInTrash += info.Size() - blocksInTrash++ + atomic.AddInt64(&bytesInTrash, info.Size()) + atomic.AddInt64(&blocksInTrash, 1) if deadline > time.Now().Unix() { - return nil + return } err = v.os.Remove(path) if err != nil { log.Printf("EmptyTrash: Remove %v: %v", path, err) + return + } + atomic.AddInt64(&bytesDeleted, info.Size()) + atomic.AddInt64(&blocksDeleted, 1) + } + + type dirent struct { + path string + info os.FileInfo + } + var wg sync.WaitGroup + todo := make(chan dirent, theConfig.EmptyTrashWorkers) + for i := 0; i < 1 || i < theConfig.EmptyTrashWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for e := range todo { + doFile(e.path, e.info) + } + }() + } + + err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error { + if err != nil { + log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err) return nil } - bytesDeleted += info.Size() - blocksDeleted++ + todo <- dirent{path, info} return nil }) + close(todo) + wg.Wait() if err != nil { log.Printf("EmptyTrash error for %v: %v", v.String(), err)