From 62d28600cbfc31f8e72c61e4519ff198cb66a02a Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Tue, 29 Oct 2019 16:51:13 -0400 Subject: [PATCH] 15521: Convert remaining log uses to logrus. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- services/keepstore/command.go | 2 +- services/keepstore/handler_test.go | 12 +-- services/keepstore/handlers.go | 112 ++++++++++-------------- services/keepstore/s3_volume.go | 66 +++++++------- services/keepstore/s3_volume_test.go | 5 +- services/keepstore/trash_worker.go | 20 ++--- services/keepstore/trash_worker_test.go | 3 +- services/keepstore/unix_volume.go | 53 ++++++----- services/keepstore/unix_volume_test.go | 2 + 9 files changed, 126 insertions(+), 149 deletions(-) diff --git a/services/keepstore/command.go b/services/keepstore/command.go index c589e639f5..006d244639 100644 --- a/services/keepstore/command.go +++ b/services/keepstore/command.go @@ -190,7 +190,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str // Initialize the trashq and workers h.trashq = NewWorkQueue() for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ { - go RunTrashWorker(h.volmgr, h.Cluster, h.trashq) + go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq) } // Set up routes and metrics diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go index 9d69b9fa47..54b4871fab 100644 --- a/services/keepstore/handler_test.go +++ b/services/keepstore/handler_test.go @@ -892,10 +892,7 @@ func ExpectStatusCode( testname string, expectedStatus int, response *httptest.ResponseRecorder) { - if response.Code != expectedStatus { - c.Errorf("%s: expected status %d, got %+v", - testname, expectedStatus, response) - } + c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname)) } func ExpectBody( @@ -1147,12 +1144,7 @@ func (s *HandlerSuite) TestUntrashHandler(c *check.C) { "", http.StatusOK, response) - expected := "Successfully untrashed on: [MockVolume],[MockVolume]" - if response.Body.String() != expected { - c.Errorf( - "Untrash response mismatched: expected %s, got:\n%s", - expected, response.Body.String()) - } + c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n") } func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) { diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go index 30ea695f0c..0fcc121441 100644 --- a/services/keepstore/handlers.go +++ b/services/keepstore/handlers.go @@ -11,7 +11,6 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "os" "regexp" @@ -342,25 +341,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) { } var ds debugStats runtime.ReadMemStats(&ds.MemStats) - err := json.NewEncoder(resp).Encode(&ds) + data, err := json.Marshal(&ds) if err != nil { - http.Error(resp, err.Error(), 500) + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(data) } // StatusHandler addresses /status.json requests. func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) { stLock.Lock() rtr.readNodeStatus(&st) - jstat, err := json.Marshal(&st) + data, err := json.Marshal(&st) stLock.Unlock() - if err == nil { - resp.Write(jstat) - } else { - log.Printf("json.Marshal: %s", err) - log.Printf("NodeStatus = %v", &st) - http.Error(resp, err.Error(), 500) + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(data) } // populate the given NodeStatus struct with current values. @@ -457,28 +456,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) { continue } else { result.Failed++ - log.Println("DeleteHandler:", err) + ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol) } } - - var st int - if result.Deleted == 0 && result.Failed == 0 { - st = http.StatusNotFound - } else { - st = http.StatusOK + resp.WriteHeader(http.StatusNotFound) + return } - - resp.WriteHeader(st) - - if st == http.StatusOK { - if body, err := json.Marshal(result); err == nil { - resp.Write(body) - } else { - log.Printf("json.Marshal: %s (result = %v)", err, result) - http.Error(resp, err.Error(), 500) - } + body, err := json.Marshal(result) + if err != nil { + http.Error(resp, err.Error(), http.StatusInternalServerError) + return } + resp.Write(body) } /* PullHandler processes "PUT /pull" requests for the data manager. @@ -600,6 +590,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { return } + log := ctxlog.FromContext(req.Context()) hash := mux.Vars(req)["hash"] if len(rtr.volmgr.AllWritable()) == 0 { @@ -615,27 +606,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { if os.IsNotExist(err) { numNotFound++ } else if err != nil { - log.Printf("Error untrashing %v on volume %v", hash, vol.String()) + log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol) failedOn = append(failedOn, vol.String()) } else { - log.Printf("Untrashed %v on volume %v", hash, vol.String()) + log.Infof("Untrashed %v on volume %v", hash, vol.String()) untrashedOn = append(untrashedOn, vol.String()) } } if numNotFound == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound) - return - } - - if len(failedOn) == len(rtr.volmgr.AllWritable()) { + } else if len(failedOn) == len(rtr.volmgr.AllWritable()) { http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError) } else { - respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",") + respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ") if len(failedOn) > 0 { - respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",") + respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ") + http.Error(resp, respBody, http.StatusInternalServerError) + } else { + fmt.Fprintln(resp, respBody) } - resp.Write([]byte(respBody)) } } @@ -659,6 +649,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) { // DiskHashError. // func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) { + log := ctxlog.FromContext(ctx) + // Attempt to read the requested hash from a keep volume. errorToCaller := NotFoundError @@ -676,7 +668,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b // volumes. If all volumes report IsNotExist, // we return a NotFoundError. if !os.IsNotExist(err) { - log.Printf("%s: Get(%s): %s", vol, hash, err) + log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol) } // If some volume returns a transient error, return it to the caller // instead of "Not found" so it can retry. @@ -686,19 +678,16 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b continue } // Check the file checksum. - // filehash := fmt.Sprintf("%x", md5.Sum(buf[:size])) if filehash != hash { // TODO: Try harder to tell a sysadmin about // this. - log.Printf("%s: checksum mismatch for request %s (actual %s)", - vol, hash, filehash) + log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol) errorToCaller = DiskHashError continue } if errorToCaller == DiskHashError { - log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned", - vol, hash) + log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol) } return size, nil } @@ -754,12 +743,14 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s // Choose a Keep volume to write to. // If this volume fails, try all of the volumes in order. if mnt := volmgr.NextWritable(); mnt != nil { - if err := mnt.Put(ctx, hash, block); err == nil { + if err := mnt.Put(ctx, hash, block); err != nil { + log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash) + } else { return mnt.Replication, nil // success! } - if ctx.Err() != nil { - return 0, ErrClientDisconnect - } + } + if ctx.Err() != nil { + return 0, ErrClientDisconnect } writables := volmgr.AllWritable() @@ -774,15 +765,17 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s if ctx.Err() != nil { return 0, ErrClientDisconnect } - if err == nil { + switch err { + case nil: return vol.Replication, nil // success! - } - if err != FullError { + case FullError: + continue + default: // The volume is not full but the // write did not succeed. Report the // error and continue trying. allFull = false - log.Errorf("%s: Write(%s): %s", vol, hash, err) + log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash) } } @@ -800,6 +793,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s // premature garbage collection. Otherwise, it returns a non-nil // error. func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) { + log := ctxlog.FromContext(ctx) var bestErr error = NotFoundError for _, mnt := range volmgr.AllWritable() { err := mnt.Compare(ctx, hash, buf) @@ -811,7 +805,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // to tell which one is wanted if we have // both, so there's no point writing it even // on a different volume.) - log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err) + log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume) return 0, err } else if os.IsNotExist(err) { // Block does not exist. This is the only @@ -821,11 +815,11 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, // Couldn't open file, data is corrupt on // disk, etc.: log this abnormal condition, // and try the next volume. - log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err) + log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume) continue } if err := mnt.Touch(hash); err != nil { - log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err) + log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume) bestErr = err continue } @@ -859,18 +853,6 @@ func GetAPIToken(req *http.Request) string { return "" } -// IsExpired returns true if the given Unix timestamp (expressed as a -// hexadecimal string) is in the past, or if timestampHex cannot be -// parsed as a hexadecimal string. -func IsExpired(timestampHex string) bool { - ts, err := strconv.ParseInt(timestampHex, 16, 0) - if err != nil { - log.Printf("IsExpired: %s", err) - return true - } - return time.Unix(ts, 0).Before(time.Now()) -} - // canDelete returns true if the user identified by apiToken is // allowed to delete blocks. func (rtr *router) canDelete(apiToken string) bool { diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go index 220377af28..08cc591fc5 100644 --- a/services/keepstore/s3_volume.go +++ b/services/keepstore/s3_volume.go @@ -16,7 +16,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "net/http" "os" "regexp" @@ -37,11 +36,12 @@ func init() { } func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) { - v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics} + v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics} err := json.Unmarshal(volume.DriverParameters, &v) if err != nil { return nil, err } + v.logger = logger.WithField("Volume", v.String()) return v, v.check() } @@ -340,7 +340,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) { rdr, err = v.bucket.GetReader(loc) if err != nil { - log.Printf("warning: reading %s after successful fixRace: %s", loc, err) + v.logger.Warnf("reading %s after successful fixRace: %s", loc, err) err = v.translateError(err) } return @@ -465,7 +465,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { go func() { defer func() { if ctx.Err() != nil { - v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err) + v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err) } }() defer close(ready) @@ -477,7 +477,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { }() select { case <-ctx.Done(): - v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err()) + v.logger.Debugf("taking PutReader's input away: %s", ctx.Err()) // Our pipe might be stuck in Write(), waiting for // PutReader() to read. If so, un-stick it. This means // PutReader will get corrupt data, but that's OK: the @@ -485,7 +485,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error { go io.Copy(ioutil.Discard, bufr) // CloseWithError() will return once pending I/O is done. bufw.CloseWithError(ctx.Err()) - v.logger.Debugf("%s: abandoning PutReader goroutine", v) + v.logger.Debugf("abandoning PutReader goroutine") return ctx.Err() case <-ready: // Unblock pipe in case PutReader did not consume it. @@ -523,13 +523,13 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) { // The data object X exists, but recent/X is missing. err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{}) if err != nil { - log.Printf("error: creating %q: %s", "recent/"+loc, err) + v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc) return zeroTime, v.translateError(err) } - log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc) + v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc) resp, err = v.bucket.Head("recent/"+loc, nil) if err != nil { - log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err) + v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc) return zeroTime, v.translateError(err) } } else if err != nil { @@ -544,12 +544,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) { func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error { // Use a merge sort to find matching sets of X and recent/X. dataL := s3Lister{ + Logger: v.logger, Bucket: v.bucket.Bucket(), Prefix: prefix, PageSize: v.IndexPageSize, Stats: &v.bucket.stats, } recentL := s3Lister{ + Logger: v.logger, Bucket: v.bucket.Bucket(), Prefix: "recent/" + prefix, PageSize: v.IndexPageSize, @@ -744,24 +746,24 @@ func (v *S3Volume) fixRace(loc string) bool { trash, err := v.bucket.Head("trash/"+loc, nil) if err != nil { if !os.IsNotExist(v.translateError(err)) { - log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err) + v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc) } return false } trashTime, err := v.lastModified(trash) if err != nil { - log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err) + v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified")) return false } recent, err := v.bucket.Head("recent/"+loc, nil) if err != nil { - log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err) + v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc) return false } recentTime, err := v.lastModified(recent) if err != nil { - log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err) + v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified")) return false } @@ -772,11 +774,11 @@ func (v *S3Volume) fixRace(loc string) bool { return false } - log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL) - log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc) + v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL) + v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc) err = v.safeCopy(loc, "trash/"+loc) if err != nil { - log.Printf("error: fixRace: %s", err) + v.logger.WithError(err).Error("fixRace: copy failed") return false } return true @@ -819,24 +821,24 @@ func (v *S3Volume) EmptyTrash() { trashT, err := time.Parse(time.RFC3339, trash.LastModified) if err != nil { - log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err) + v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err) return } recent, err := v.bucket.Head("recent/"+loc, nil) if err != nil && os.IsNotExist(v.translateError(err)) { - log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err) + v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err) err = v.Untrash(loc) if err != nil { - log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err) + v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc) } return } else if err != nil { - log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err) + v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc) return } recentT, err := v.lastModified(recent) if err != nil { - log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err) + v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified")) return } if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() { @@ -849,18 +851,18 @@ func (v *S3Volume) EmptyTrash() { // Note this means (TrashSweepInterval // < BlobSigningTTL - raceWindow) is // necessary to avoid starvation. - log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc) + v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc) v.fixRace(loc) v.Touch(loc) return } _, err := v.bucket.Head(loc, nil) if os.IsNotExist(err) { - log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc) + v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc) v.fixRace(loc) return } else if err != nil { - log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err) + v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc) return } } @@ -869,7 +871,7 @@ func (v *S3Volume) EmptyTrash() { } err = v.bucket.Del(trash.Key) if err != nil { - log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err) + v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key) return } atomic.AddInt64(&bytesDeleted, trash.Size) @@ -877,16 +879,16 @@ func (v *S3Volume) EmptyTrash() { _, err = v.bucket.Head(loc, nil) if err == nil { - log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc) + v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc) return } if !os.IsNotExist(v.translateError(err)) { - log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err) + v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc) return } err = v.bucket.Del("recent/" + loc) if err != nil { - log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err) + v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc) } } @@ -903,6 +905,7 @@ func (v *S3Volume) EmptyTrash() { } trashL := s3Lister{ + Logger: v.logger, Bucket: v.bucket.Bucket(), Prefix: "trash/", PageSize: v.IndexPageSize, @@ -915,12 +918,13 @@ func (v *S3Volume) EmptyTrash() { wg.Wait() if err := trashL.Error(); err != nil { - log.Printf("error: %s: EmptyTrash: lister: %s", v, err) + v.logger.WithError(err).Error("EmptyTrash: lister failed") } - log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) + v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } type s3Lister struct { + Logger logrus.FieldLogger Bucket *s3.Bucket Prefix string PageSize int @@ -967,7 +971,7 @@ func (lister *s3Lister) getPage() { lister.buf = make([]s3.Key, 0, len(resp.Contents)) for _, key := range resp.Contents { if !strings.HasPrefix(key.Key, lister.Prefix) { - log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key) + lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key) continue } lister.buf = append(lister.buf, key) diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go index 49ea24aa03..dbd6a45ed9 100644 --- a/services/keepstore/s3_volume_test.go +++ b/services/keepstore/s3_volume_test.go @@ -11,7 +11,6 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "net/http/httptest" "os" @@ -499,11 +498,11 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster, func (v *TestableS3Volume) PutRaw(loc string, block []byte) { err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{}) if err != nil { - log.Printf("PutRaw: %s: %+v", loc, err) + v.logger.Printf("PutRaw: %s: %+v", loc, err) } err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{}) if err != nil { - log.Printf("PutRaw: recent/%s: %+v", loc, err) + v.logger.Printf("PutRaw: recent/%s: %+v", loc, err) } } diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go index ba1455ac65..3b1bd04230 100644 --- a/services/keepstore/trash_worker.go +++ b/services/keepstore/trash_worker.go @@ -6,10 +6,10 @@ package main import ( "errors" - "log" "time" "git.curoverse.com/arvados.git/sdk/go/arvados" + "github.com/sirupsen/logrus" ) // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine. @@ -18,19 +18,19 @@ import ( // Delete the block indicated by the trash request Locator // Repeat // -func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) { +func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) { for item := range trashq.NextItem { trashRequest := item.(TrashRequest) - TrashItem(volmgr, cluster, trashRequest) + TrashItem(volmgr, logger, cluster, trashRequest) trashq.DoneItem <- struct{}{} } } // TrashItem deletes the indicated block from every writable volume. -func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) { +func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) { reqMtime := time.Unix(0, trashRequest.BlockMtime) if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() { - log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", + logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.", arvados.Duration(time.Since(reqMtime)), trashRequest.Locator, trashRequest.BlockMtime, @@ -43,7 +43,7 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T if uuid := trashRequest.MountUUID; uuid == "" { volumes = volmgr.AllWritable() } else if mnt := volmgr.Lookup(uuid, true); mnt == nil { - log.Printf("warning: trash request for nonexistent mount: %v", trashRequest) + logger.Warnf("trash request for nonexistent mount: %v", trashRequest) return } else { volumes = []*VolumeMount{mnt} @@ -52,11 +52,11 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T for _, volume := range volumes { mtime, err := volume.Mtime(trashRequest.Locator) if err != nil { - log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err) + logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator) continue } if trashRequest.BlockMtime != mtime.UnixNano() { - log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime) + logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime) continue } @@ -67,9 +67,9 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T } if err != nil { - log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err) + logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator) } else { - log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator) + logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator) } } } diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go index bd3743090a..c2052077fe 100644 --- a/services/keepstore/trash_worker_test.go +++ b/services/keepstore/trash_worker_test.go @@ -9,6 +9,7 @@ import ( "context" "time" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" check "gopkg.in/check.v1" ) @@ -291,7 +292,7 @@ func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTe } } } - go RunTrashWorker(s.handler.volmgr, s.cluster, trashq) + go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq) // Install gate so all local operations block until we say go gate := make(chan struct{}) diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go index 6504f9c16b..f41bd30d3d 100644 --- a/services/keepstore/unix_volume.go +++ b/services/keepstore/unix_volume.go @@ -11,7 +11,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "os" "os/exec" "path/filepath" @@ -38,6 +37,7 @@ func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger if err != nil { return nil, err } + v.logger = v.logger.WithField("Volume", v.String()) return v, v.check() } @@ -84,7 +84,7 @@ type UnixVolume struct { // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep". func (v *UnixVolume) GetDeviceID() string { giveup := func(f string, args ...interface{}) string { - log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...) + v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...) return "" } buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput() @@ -143,7 +143,7 @@ func (v *UnixVolume) GetDeviceID() string { link := filepath.Join(udir, uuid) fi, err = os.Stat(link) if err != nil { - log.Printf("error: stat %q: %s", link, err) + v.logger.WithError(err).Errorf("stat(%q) failed", link) continue } if fi.Sys().(*syscall.Stat_t).Ino == ino { @@ -271,15 +271,12 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) } bdir := v.blockDir(loc) if err := os.MkdirAll(bdir, 0755); err != nil { - log.Printf("%s: could not create directory %s: %s", - loc, bdir, err) - return err + return fmt.Errorf("error creating directory %s: %s", bdir, err) } tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc) if tmperr != nil { - log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr) - return tmperr + return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr) } bpath := v.blockPath(loc) @@ -291,19 +288,20 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) n, err := io.Copy(tmpfile, rdr) v.os.stats.TickOutBytes(uint64(n)) if err != nil { - log.Printf("%s: writing to %s: %s", v, bpath, err) + err = fmt.Errorf("error writing %s: %s", bpath, err) tmpfile.Close() v.os.Remove(tmpfile.Name()) return err } if err := tmpfile.Close(); err != nil { - log.Printf("closing %s: %s", tmpfile.Name(), err) + err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err) v.os.Remove(tmpfile.Name()) return err } if err := v.os.Rename(tmpfile.Name(), bpath); err != nil { - log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err) - return v.os.Remove(tmpfile.Name()) + err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err) + v.os.Remove(tmpfile.Name()) + return err } return nil } @@ -314,14 +312,14 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader) func (v *UnixVolume) Status() *VolumeStatus { fi, err := v.os.Stat(v.Root) if err != nil { - log.Printf("%s: os.Stat: %s", v, err) + v.logger.WithError(err).Error("stat failed") return nil } devnum := fi.Sys().(*syscall.Stat_t).Dev var fs syscall.Statfs_t if err := syscall.Statfs(v.Root, &fs); err != nil { - log.Printf("%s: statfs: %s", v, err) + v.logger.WithError(err).Error("statfs failed") return nil } // These calculations match the way df calculates disk usage: @@ -380,8 +378,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { blockdirpath := filepath.Join(v.Root, names[0]) blockdir, err := v.os.Open(blockdirpath) if err != nil { - log.Print("Error reading ", blockdirpath, ": ", err) - lastErr = err + v.logger.WithError(err).Errorf("error reading %q", blockdirpath) + lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err) continue } v.os.stats.TickOps("readdir") @@ -391,8 +389,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { if err == io.EOF { break } else if err != nil { - log.Print("Error reading ", blockdirpath, ": ", err) - lastErr = err + v.logger.WithError(err).Errorf("error reading %q", blockdirpath) + lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err) break } name := fileInfo[0].Name() @@ -408,9 +406,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error { " ", fileInfo[0].ModTime().UnixNano(), "\n") if err != nil { - log.Print("Error writing : ", err) - lastErr = err - break + blockdir.Close() + return fmt.Errorf("error writing: %s", err) } } blockdir.Close() @@ -534,7 +531,7 @@ func (v *UnixVolume) IsFull() (isFull bool) { if avail, err := v.FreeDiskSpace(); err == nil { isFull = avail < MinFreeKilobytes } else { - log.Printf("%s: FreeDiskSpace: %s", v, err) + v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v) isFull = false } @@ -584,7 +581,7 @@ func (v *UnixVolume) lock(ctx context.Context) error { }() select { case <-ctx.Done(): - log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0)) + v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0)) go func() { <-locked v.locker.Unlock() @@ -653,7 +650,7 @@ func (v *UnixVolume) EmptyTrash() { } deadline, err := strconv.ParseInt(matches[2], 10, 64) if err != nil { - log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err) + v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2]) return } atomic.AddInt64(&bytesInTrash, info.Size()) @@ -663,7 +660,7 @@ func (v *UnixVolume) EmptyTrash() { } err = v.os.Remove(path) if err != nil { - log.Printf("EmptyTrash: Remove %v: %v", path, err) + v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path) return } atomic.AddInt64(&bytesDeleted, info.Size()) @@ -688,7 +685,7 @@ func (v *UnixVolume) EmptyTrash() { err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error { if err != nil { - log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err) + v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path) return nil } todo <- dirent{path, info} @@ -698,10 +695,10 @@ func (v *UnixVolume) EmptyTrash() { wg.Wait() if err != nil { - log.Printf("EmptyTrash error for %v: %v", v.String(), err) + v.logger.WithError(err).Error("EmptyTrash failed") } - log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) + v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted) } type unixStats struct { diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go index 1ffc46513c..664956f7bc 100644 --- a/services/keepstore/unix_volume_test.go +++ b/services/keepstore/unix_volume_test.go @@ -19,6 +19,7 @@ import ( "time" "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/ctxlog" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" check "gopkg.in/check.v1" @@ -90,6 +91,7 @@ func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Clu Root: d, locker: locker, cluster: cluster, + logger: ctxlog.TestLogger(c), volume: volume, metrics: metrics, }, -- 2.30.2