// Initialize the trashq and workers
h.trashq = NewWorkQueue()
for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
- go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
+ go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
}
// Set up routes and metrics
testname string,
expectedStatus int,
response *httptest.ResponseRecorder) {
- if response.Code != expectedStatus {
- c.Errorf("%s: expected status %d, got %+v",
- testname, expectedStatus, response)
- }
+ c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname))
}
func ExpectBody(
"",
http.StatusOK,
response)
- expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
- if response.Body.String() != expected {
- c.Errorf(
- "Untrash response mismatched: expected %s, got:\n%s",
- expected, response.Body.String())
- }
+ c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n")
}
func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) {
"encoding/json"
"fmt"
"io"
- "log"
"net/http"
"os"
"regexp"
}
var ds debugStats
runtime.ReadMemStats(&ds.MemStats)
- err := json.NewEncoder(resp).Encode(&ds)
+ data, err := json.Marshal(&ds)
if err != nil {
- http.Error(resp, err.Error(), 500)
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// StatusHandler addresses /status.json requests.
func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
rtr.readNodeStatus(&st)
- jstat, err := json.Marshal(&st)
+ data, err := json.Marshal(&st)
stLock.Unlock()
- if err == nil {
- resp.Write(jstat)
- } else {
- log.Printf("json.Marshal: %s", err)
- log.Printf("NodeStatus = %v", &st)
- http.Error(resp, err.Error(), 500)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// populate the given NodeStatus struct with current values.
continue
} else {
result.Failed++
- log.Println("DeleteHandler:", err)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
}
}
-
- var st int
-
if result.Deleted == 0 && result.Failed == 0 {
- st = http.StatusNotFound
- } else {
- st = http.StatusOK
+ resp.WriteHeader(http.StatusNotFound)
+ return
}
-
- resp.WriteHeader(st)
-
- if st == http.StatusOK {
- if body, err := json.Marshal(result); err == nil {
- resp.Write(body)
- } else {
- log.Printf("json.Marshal: %s (result = %v)", err, result)
- http.Error(resp, err.Error(), 500)
- }
+ body, err := json.Marshal(result)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(body)
}
/* PullHandler processes "PUT /pull" requests for the data manager.
return
}
+ log := ctxlog.FromContext(req.Context())
hash := mux.Vars(req)["hash"]
if len(rtr.volmgr.AllWritable()) == 0 {
if os.IsNotExist(err) {
numNotFound++
} else if err != nil {
- log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
failedOn = append(failedOn, vol.String())
} else {
- log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ log.Infof("Untrashed %v on volume %v", hash, vol.String())
untrashedOn = append(untrashedOn, vol.String())
}
}
if numNotFound == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
- return
- }
-
- if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+ } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
} else {
- respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
if len(failedOn) > 0 {
- respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+ http.Error(resp, respBody, http.StatusInternalServerError)
+ } else {
+ fmt.Fprintln(resp, respBody)
}
- resp.Write([]byte(respBody))
}
}
// DiskHashError.
//
func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
// volumes. If all volumes report IsNotExist,
// we return a NotFoundError.
if !os.IsNotExist(err) {
- log.Printf("%s: Get(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
}
// If some volume returns a transient error, return it to the caller
// instead of "Not found" so it can retry.
continue
}
// Check the file checksum.
- //
filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
- log.Printf("%s: checksum mismatch for request %s (actual %s)",
- vol, hash, filehash)
+ log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
errorToCaller = DiskHashError
continue
}
if errorToCaller == DiskHashError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
- vol, hash)
+ log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
}
return size, nil
}
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if mnt := volmgr.NextWritable(); mnt != nil {
- if err := mnt.Put(ctx, hash, block); err == nil {
+ if err := mnt.Put(ctx, hash, block); err != nil {
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ } else {
return mnt.Replication, nil // success!
}
- if ctx.Err() != nil {
- return 0, ErrClientDisconnect
- }
+ }
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
}
writables := volmgr.AllWritable()
if ctx.Err() != nil {
return 0, ErrClientDisconnect
}
- if err == nil {
+ switch err {
+ case nil:
return vol.Replication, nil // success!
- }
- if err != FullError {
+ case FullError:
+ continue
+ default:
// The volume is not full but the
// write did not succeed. Report the
// error and continue trying.
allFull = false
- log.Errorf("%s: Write(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
}
}
// premature garbage collection. Otherwise, it returns a non-nil
// error.
func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+ log := ctxlog.FromContext(ctx)
var bestErr error = NotFoundError
for _, mnt := range volmgr.AllWritable() {
err := mnt.Compare(ctx, hash, buf)
// to tell which one is wanted if we have
// both, so there's no point writing it even
// on a different volume.)
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
// Couldn't open file, data is corrupt on
// disk, etc.: log this abnormal condition,
// and try the next volume.
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
continue
}
if err := mnt.Touch(hash); err != nil {
- log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+ log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
bestErr = err
continue
}
return ""
}
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
- ts, err := strconv.ParseInt(timestampHex, 16, 0)
- if err != nil {
- log.Printf("IsExpired: %s", err)
- return true
- }
- return time.Unix(ts, 0).Before(time.Now())
-}
-
// canDelete returns true if the user identified by apiToken is
// allowed to delete blocks.
func (rtr *router) canDelete(apiToken string) bool {
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"os"
"regexp"
}
func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
- v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+ v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
err := json.Unmarshal(volume.DriverParameters, &v)
if err != nil {
return nil, err
}
+ v.logger = logger.WithField("Volume", v.String())
return v, v.check()
}
rdr, err = v.bucket.GetReader(loc)
if err != nil {
- log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+ v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
err = v.translateError(err)
}
return
go func() {
defer func() {
if ctx.Err() != nil {
- v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
}
}()
defer close(ready)
}()
select {
case <-ctx.Done():
- v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
// Our pipe might be stuck in Write(), waiting for
// PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
go io.Copy(ioutil.Discard, bufr)
// CloseWithError() will return once pending I/O is done.
bufw.CloseWithError(ctx.Err())
- v.logger.Debugf("%s: abandoning PutReader goroutine", v)
+ v.logger.Debugf("abandoning PutReader goroutine")
return ctx.Err()
case <-ready:
// Unblock pipe in case PutReader did not consume it.
// The data object X exists, but recent/X is missing.
err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("error: creating %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
return zeroTime, v.translateError(err)
}
- log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+ v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
resp, err = v.bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
return zeroTime, v.translateError(err)
}
} else if err != nil {
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: prefix,
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
recentL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
trash, err := v.bucket.Head("trash/"+loc, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
}
return false
}
trashTime, err := v.lastModified(trash)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
return false
}
recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
return false
}
recentTime, err := v.lastModified(recent)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
return false
}
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
- log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
err = v.safeCopy(loc, "trash/"+loc)
if err != nil {
- log.Printf("error: fixRace: %s", err)
+ v.logger.WithError(err).Error("fixRace: copy failed")
return false
}
return true
trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
return
}
recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+ v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
if err != nil {
- log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
}
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
return
}
recentT, err := v.lastModified(recent)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
return
}
if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
// Note this means (TrashSweepInterval
// < BlobSigningTTL - raceWindow) is
// necessary to avoid starvation.
- log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+ v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
v.fixRace(loc)
v.Touch(loc)
return
}
_, err := v.bucket.Head(loc, nil)
if os.IsNotExist(err) {
- log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+ v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
v.fixRace(loc)
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
return
}
}
}
err = v.bucket.Del(trash.Key)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
return
}
atomic.AddInt64(&bytesDeleted, trash.Size)
_, err = v.bucket.Head(loc, nil)
if err == nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
return
}
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
return
}
err = v.bucket.Del("recent/" + loc)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
}
}
}
trashL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: "trash/",
PageSize: v.IndexPageSize,
wg.Wait()
if err := trashL.Error(); err != nil {
- log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+ v.logger.WithError(err).Error("EmptyTrash: lister failed")
}
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type s3Lister struct {
+ Logger logrus.FieldLogger
Bucket *s3.Bucket
Prefix string
PageSize int
lister.buf = make([]s3.Key, 0, len(resp.Contents))
for _, key := range resp.Contents {
if !strings.HasPrefix(key.Key, lister.Prefix) {
- log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+ lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
continue
}
lister.buf = append(lister.buf, key)
"encoding/json"
"fmt"
"io"
- "log"
"net/http"
"net/http/httptest"
"os"
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("PutRaw: %s: %+v", loc, err)
+ v.logger.Printf("PutRaw: %s: %+v", loc, err)
}
err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("PutRaw: recent/%s: %+v", loc, err)
+ v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
}
}
import (
"errors"
- "log"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
// Delete the block indicated by the trash request Locator
// Repeat
//
-func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
+func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
- TrashItem(volmgr, cluster, trashRequest)
+ TrashItem(volmgr, logger, cluster, trashRequest)
trashq.DoneItem <- struct{}{}
}
}
// TrashItem deletes the indicated block from every writable volume.
-func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
reqMtime := time.Unix(0, trashRequest.BlockMtime)
if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
- log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+ logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
arvados.Duration(time.Since(reqMtime)),
trashRequest.Locator,
trashRequest.BlockMtime,
if uuid := trashRequest.MountUUID; uuid == "" {
volumes = volmgr.AllWritable()
} else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
- log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+ logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
return
} else {
volumes = []*VolumeMount{mnt}
for _, volume := range volumes {
mtime, err := volume.Mtime(trashRequest.Locator)
if err != nil {
- log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
continue
}
if trashRequest.BlockMtime != mtime.UnixNano() {
- log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+ logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
continue
}
}
if err != nil {
- log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
} else {
- log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
+ logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
}
}
}
"context"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
}
}
}
- go RunTrashWorker(s.handler.volmgr, s.cluster, trashq)
+ go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
// Install gate so all local operations block until we say go
gate := make(chan struct{})
"fmt"
"io"
"io/ioutil"
- "log"
"os"
"os/exec"
"path/filepath"
if err != nil {
return nil, err
}
+ v.logger = v.logger.WithField("Volume", v.String())
return v, v.check()
}
// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
func (v *UnixVolume) GetDeviceID() string {
giveup := func(f string, args ...interface{}) string {
- log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+ v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
return ""
}
buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
link := filepath.Join(udir, uuid)
fi, err = os.Stat(link)
if err != nil {
- log.Printf("error: stat %q: %s", link, err)
+ v.logger.WithError(err).Errorf("stat(%q) failed", link)
continue
}
if fi.Sys().(*syscall.Stat_t).Ino == ino {
}
bdir := v.blockDir(loc)
if err := os.MkdirAll(bdir, 0755); err != nil {
- log.Printf("%s: could not create directory %s: %s",
- loc, bdir, err)
- return err
+ return fmt.Errorf("error creating directory %s: %s", bdir, err)
}
tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
if tmperr != nil {
- log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
- return tmperr
+ return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr)
}
bpath := v.blockPath(loc)
n, err := io.Copy(tmpfile, rdr)
v.os.stats.TickOutBytes(uint64(n))
if err != nil {
- log.Printf("%s: writing to %s: %s", v, bpath, err)
+ err = fmt.Errorf("error writing %s: %s", bpath, err)
tmpfile.Close()
v.os.Remove(tmpfile.Name())
return err
}
if err := tmpfile.Close(); err != nil {
- log.Printf("closing %s: %s", tmpfile.Name(), err)
+ err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err)
v.os.Remove(tmpfile.Name())
return err
}
if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
- log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err)
- return v.os.Remove(tmpfile.Name())
+ err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err)
+ v.os.Remove(tmpfile.Name())
+ return err
}
return nil
}
func (v *UnixVolume) Status() *VolumeStatus {
fi, err := v.os.Stat(v.Root)
if err != nil {
- log.Printf("%s: os.Stat: %s", v, err)
+ v.logger.WithError(err).Error("stat failed")
return nil
}
devnum := fi.Sys().(*syscall.Stat_t).Dev
var fs syscall.Statfs_t
if err := syscall.Statfs(v.Root, &fs); err != nil {
- log.Printf("%s: statfs: %s", v, err)
+ v.logger.WithError(err).Error("statfs failed")
return nil
}
// These calculations match the way df calculates disk usage:
blockdirpath := filepath.Join(v.Root, names[0])
blockdir, err := v.os.Open(blockdirpath)
if err != nil {
- log.Print("Error reading ", blockdirpath, ": ", err)
- lastErr = err
+ v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+ lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
continue
}
v.os.stats.TickOps("readdir")
if err == io.EOF {
break
} else if err != nil {
- log.Print("Error reading ", blockdirpath, ": ", err)
- lastErr = err
+ v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+ lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
break
}
name := fileInfo[0].Name()
" ", fileInfo[0].ModTime().UnixNano(),
"\n")
if err != nil {
- log.Print("Error writing : ", err)
- lastErr = err
- break
+ blockdir.Close()
+ return fmt.Errorf("error writing: %s", err)
}
}
blockdir.Close()
if avail, err := v.FreeDiskSpace(); err == nil {
isFull = avail < MinFreeKilobytes
} else {
- log.Printf("%s: FreeDiskSpace: %s", v, err)
+ v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
isFull = false
}
}()
select {
case <-ctx.Done():
- log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0))
+ v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0))
go func() {
<-locked
v.locker.Unlock()
}
deadline, err := strconv.ParseInt(matches[2], 10, 64)
if err != nil {
- log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+ v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2])
return
}
atomic.AddInt64(&bytesInTrash, info.Size())
}
err = v.os.Remove(path)
if err != nil {
- log.Printf("EmptyTrash: Remove %v: %v", path, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path)
return
}
atomic.AddInt64(&bytesDeleted, info.Size())
err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
if err != nil {
- log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path)
return nil
}
todo <- dirent{path, info}
wg.Wait()
if err != nil {
- log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+ v.logger.WithError(err).Error("EmptyTrash failed")
}
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type unixStats struct {
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
Root: d,
locker: locker,
cluster: cluster,
+ logger: ctxlog.TestLogger(c),
volume: volume,
metrics: metrics,
},