15521: Convert remaining log uses to logrus.
authorTom Clegg <tclegg@veritasgenetics.com>
Tue, 29 Oct 2019 20:51:13 +0000 (16:51 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Tue, 29 Oct 2019 20:51:13 +0000 (16:51 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

services/keepstore/command.go
services/keepstore/handler_test.go
services/keepstore/handlers.go
services/keepstore/s3_volume.go
services/keepstore/s3_volume_test.go
services/keepstore/trash_worker.go
services/keepstore/trash_worker_test.go
services/keepstore/unix_volume.go
services/keepstore/unix_volume_test.go

index c589e639f557782ad4a44ffec4a40eaf6b399c94..006d2446396d1734cf5e7d9de735a1d339cb3798 100644 (file)
@@ -190,7 +190,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str
        // Initialize the trashq and workers
        h.trashq = NewWorkQueue()
        for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
-               go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
+               go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
        }
 
        // Set up routes and metrics
index 9d69b9fa47ef1b173b70e1f16617ba6dd9531351..54b4871fab89a59a2b95ba893912080ec13ff070 100644 (file)
@@ -892,10 +892,7 @@ func ExpectStatusCode(
        testname string,
        expectedStatus int,
        response *httptest.ResponseRecorder) {
-       if response.Code != expectedStatus {
-               c.Errorf("%s: expected status %d, got %+v",
-                       testname, expectedStatus, response)
-       }
+       c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname))
 }
 
 func ExpectBody(
@@ -1147,12 +1144,7 @@ func (s *HandlerSuite) TestUntrashHandler(c *check.C) {
                "",
                http.StatusOK,
                response)
-       expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
-       if response.Body.String() != expected {
-               c.Errorf(
-                       "Untrash response mismatched: expected %s, got:\n%s",
-                       expected, response.Body.String())
-       }
+       c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n")
 }
 
 func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) {
index 30ea695f0c96e125598fc8042ae1501f7a2fe70b..0fcc12144136ddb72ecfab6911dc1542ab0dcdf8 100644 (file)
@@ -11,7 +11,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -342,25 +341,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
        }
        var ds debugStats
        runtime.ReadMemStats(&ds.MemStats)
-       err := json.NewEncoder(resp).Encode(&ds)
+       data, err := json.Marshal(&ds)
        if err != nil {
-               http.Error(resp, err.Error(), 500)
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // StatusHandler addresses /status.json requests.
 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
        stLock.Lock()
        rtr.readNodeStatus(&st)
-       jstat, err := json.Marshal(&st)
+       data, err := json.Marshal(&st)
        stLock.Unlock()
-       if err == nil {
-               resp.Write(jstat)
-       } else {
-               log.Printf("json.Marshal: %s", err)
-               log.Printf("NodeStatus = %v", &st)
-               http.Error(resp, err.Error(), 500)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(data)
 }
 
 // populate the given NodeStatus struct with current values.
@@ -457,28 +456,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
                        continue
                } else {
                        result.Failed++
-                       log.Println("DeleteHandler:", err)
+                       ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
                }
        }
-
-       var st int
-
        if result.Deleted == 0 && result.Failed == 0 {
-               st = http.StatusNotFound
-       } else {
-               st = http.StatusOK
+               resp.WriteHeader(http.StatusNotFound)
+               return
        }
-
-       resp.WriteHeader(st)
-
-       if st == http.StatusOK {
-               if body, err := json.Marshal(result); err == nil {
-                       resp.Write(body)
-               } else {
-                       log.Printf("json.Marshal: %s (result = %v)", err, result)
-                       http.Error(resp, err.Error(), 500)
-               }
+       body, err := json.Marshal(result)
+       if err != nil {
+               http.Error(resp, err.Error(), http.StatusInternalServerError)
+               return
        }
+       resp.Write(body)
 }
 
 /* PullHandler processes "PUT /pull" requests for the data manager.
@@ -600,6 +590,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
+       log := ctxlog.FromContext(req.Context())
        hash := mux.Vars(req)["hash"]
 
        if len(rtr.volmgr.AllWritable()) == 0 {
@@ -615,27 +606,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
                if os.IsNotExist(err) {
                        numNotFound++
                } else if err != nil {
-                       log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+                       log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
                        failedOn = append(failedOn, vol.String())
                } else {
-                       log.Printf("Untrashed %v on volume %v", hash, vol.String())
+                       log.Infof("Untrashed %v on volume %v", hash, vol.String())
                        untrashedOn = append(untrashedOn, vol.String())
                }
        }
 
        if numNotFound == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
-               return
-       }
-
-       if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+       } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
                http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
        } else {
-               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+               respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
                if len(failedOn) > 0 {
-                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+                       respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+                       http.Error(resp, respBody, http.StatusInternalServerError)
+               } else {
+                       fmt.Fprintln(resp, respBody)
                }
-               resp.Write([]byte(respBody))
        }
 }
 
@@ -659,6 +649,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+       log := ctxlog.FromContext(ctx)
+
        // Attempt to read the requested hash from a keep volume.
        errorToCaller := NotFoundError
 
@@ -676,7 +668,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        // volumes. If all volumes report IsNotExist,
                        // we return a NotFoundError.
                        if !os.IsNotExist(err) {
-                               log.Printf("%s: Get(%s): %s", vol, hash, err)
+                               log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
                        }
                        // If some volume returns a transient error, return it to the caller
                        // instead of "Not found" so it can retry.
@@ -686,19 +678,16 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
                        continue
                }
                // Check the file checksum.
-               //
                filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
                if filehash != hash {
                        // TODO: Try harder to tell a sysadmin about
                        // this.
-                       log.Printf("%s: checksum mismatch for request %s (actual %s)",
-                               vol, hash, filehash)
+                       log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
                        errorToCaller = DiskHashError
                        continue
                }
                if errorToCaller == DiskHashError {
-                       log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
-                               vol, hash)
+                       log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
                }
                return size, nil
        }
@@ -754,12 +743,14 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
        // Choose a Keep volume to write to.
        // If this volume fails, try all of the volumes in order.
        if mnt := volmgr.NextWritable(); mnt != nil {
-               if err := mnt.Put(ctx, hash, block); err == nil {
+               if err := mnt.Put(ctx, hash, block); err != nil {
+                       log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+               } else {
                        return mnt.Replication, nil // success!
                }
-               if ctx.Err() != nil {
-                       return 0, ErrClientDisconnect
-               }
+       }
+       if ctx.Err() != nil {
+               return 0, ErrClientDisconnect
        }
 
        writables := volmgr.AllWritable()
@@ -774,15 +765,17 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
                if ctx.Err() != nil {
                        return 0, ErrClientDisconnect
                }
-               if err == nil {
+               switch err {
+               case nil:
                        return vol.Replication, nil // success!
-               }
-               if err != FullError {
+               case FullError:
+                       continue
+               default:
                        // The volume is not full but the
                        // write did not succeed.  Report the
                        // error and continue trying.
                        allFull = false
-                       log.Errorf("%s: Write(%s): %s", vol, hash, err)
+                       log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
                }
        }
 
@@ -800,6 +793,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+       log := ctxlog.FromContext(ctx)
        var bestErr error = NotFoundError
        for _, mnt := range volmgr.AllWritable() {
                err := mnt.Compare(ctx, hash, buf)
@@ -811,7 +805,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // to tell which one is wanted if we have
                        // both, so there's no point writing it even
                        // on a different volume.)
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+                       log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
                        return 0, err
                } else if os.IsNotExist(err) {
                        // Block does not exist. This is the only
@@ -821,11 +815,11 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
                        // Couldn't open file, data is corrupt on
                        // disk, etc.: log this abnormal condition,
                        // and try the next volume.
-                       log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+                       log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
                        continue
                }
                if err := mnt.Touch(hash); err != nil {
-                       log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+                       log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
                        bestErr = err
                        continue
                }
@@ -859,18 +853,6 @@ func GetAPIToken(req *http.Request) string {
        return ""
 }
 
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
-       ts, err := strconv.ParseInt(timestampHex, 16, 0)
-       if err != nil {
-               log.Printf("IsExpired: %s", err)
-               return true
-       }
-       return time.Unix(ts, 0).Before(time.Now())
-}
-
 // canDelete returns true if the user identified by apiToken is
 // allowed to delete blocks.
 func (rtr *router) canDelete(apiToken string) bool {
index 220377af280f2d64c682624ee69a67cfd6f3b636..08cc591fc52f8a54c34cb426c03a2835f23c129e 100644 (file)
@@ -16,7 +16,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net/http"
        "os"
        "regexp"
@@ -37,11 +36,12 @@ func init() {
 }
 
 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-       v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+       v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
        err := json.Unmarshal(volume.DriverParameters, &v)
        if err != nil {
                return nil, err
        }
+       v.logger = logger.WithField("Volume", v.String())
        return v, v.check()
 }
 
@@ -340,7 +340,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 
        rdr, err = v.bucket.GetReader(loc)
        if err != nil {
-               log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+               v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
                err = v.translateError(err)
        }
        return
@@ -465,7 +465,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        go func() {
                defer func() {
                        if ctx.Err() != nil {
-                               v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+                               v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
                        }
                }()
                defer close(ready)
@@ -477,7 +477,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
        }()
        select {
        case <-ctx.Done():
-               v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
+               v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
                // Our pipe might be stuck in Write(), waiting for
                // PutReader() to read. If so, un-stick it. This means
                // PutReader will get corrupt data, but that's OK: the
@@ -485,7 +485,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
                go io.Copy(ioutil.Discard, bufr)
                // CloseWithError() will return once pending I/O is done.
                bufw.CloseWithError(ctx.Err())
-               v.logger.Debugf("%s: abandoning PutReader goroutine", v)
+               v.logger.Debugf("abandoning PutReader goroutine")
                return ctx.Err()
        case <-ready:
                // Unblock pipe in case PutReader did not consume it.
@@ -523,13 +523,13 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
                // The data object X exists, but recent/X is missing.
                err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
                if err != nil {
-                       log.Printf("error: creating %q: %s", "recent/"+loc, err)
+                       v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
                        return zeroTime, v.translateError(err)
                }
-               log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+               v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
                resp, err = v.bucket.Head("recent/"+loc, nil)
                if err != nil {
-                       log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+                       v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
                        return zeroTime, v.translateError(err)
                }
        } else if err != nil {
@@ -544,12 +544,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
        // Use a merge sort to find matching sets of X and recent/X.
        dataL := s3Lister{
+               Logger:   v.logger,
                Bucket:   v.bucket.Bucket(),
                Prefix:   prefix,
                PageSize: v.IndexPageSize,
                Stats:    &v.bucket.stats,
        }
        recentL := s3Lister{
+               Logger:   v.logger,
                Bucket:   v.bucket.Bucket(),
                Prefix:   "recent/" + prefix,
                PageSize: v.IndexPageSize,
@@ -744,24 +746,24 @@ func (v *S3Volume) fixRace(loc string) bool {
        trash, err := v.bucket.Head("trash/"+loc, nil)
        if err != nil {
                if !os.IsNotExist(v.translateError(err)) {
-                       log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+                       v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
                }
                return false
        }
        trashTime, err := v.lastModified(trash)
        if err != nil {
-               log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+               v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
                return false
        }
 
        recent, err := v.bucket.Head("recent/"+loc, nil)
        if err != nil {
-               log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+               v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
                return false
        }
        recentTime, err := v.lastModified(recent)
        if err != nil {
-               log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+               v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
                return false
        }
 
@@ -772,11 +774,11 @@ func (v *S3Volume) fixRace(loc string) bool {
                return false
        }
 
-       log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
-       log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+       v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+       v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
        err = v.safeCopy(loc, "trash/"+loc)
        if err != nil {
-               log.Printf("error: fixRace: %s", err)
+               v.logger.WithError(err).Error("fixRace: copy failed")
                return false
        }
        return true
@@ -819,24 +821,24 @@ func (v *S3Volume) EmptyTrash() {
 
                trashT, err := time.Parse(time.RFC3339, trash.LastModified)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+                       v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
                        return
                }
                recent, err := v.bucket.Head("recent/"+loc, nil)
                if err != nil && os.IsNotExist(v.translateError(err)) {
-                       log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+                       v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
                        err = v.Untrash(loc)
                        if err != nil {
-                               log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+                               v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
                        }
                        return
                } else if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
                        return
                }
                recentT, err := v.lastModified(recent)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
                        return
                }
                if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -849,18 +851,18 @@ func (v *S3Volume) EmptyTrash() {
                                // Note this means (TrashSweepInterval
                                // < BlobSigningTTL - raceWindow) is
                                // necessary to avoid starvation.
-                               log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+                               v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
                                v.fixRace(loc)
                                v.Touch(loc)
                                return
                        }
                        _, err := v.bucket.Head(loc, nil)
                        if os.IsNotExist(err) {
-                               log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+                               v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
                                v.fixRace(loc)
                                return
                        } else if err != nil {
-                               log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                               v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
                                return
                        }
                }
@@ -869,7 +871,7 @@ func (v *S3Volume) EmptyTrash() {
                }
                err = v.bucket.Del(trash.Key)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
                        return
                }
                atomic.AddInt64(&bytesDeleted, trash.Size)
@@ -877,16 +879,16 @@ func (v *S3Volume) EmptyTrash() {
 
                _, err = v.bucket.Head(loc, nil)
                if err == nil {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+                       v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
                        return
                }
                if !os.IsNotExist(v.translateError(err)) {
-                       log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
                        return
                }
                err = v.bucket.Del("recent/" + loc)
                if err != nil {
-                       log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+                       v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
                }
        }
 
@@ -903,6 +905,7 @@ func (v *S3Volume) EmptyTrash() {
        }
 
        trashL := s3Lister{
+               Logger:   v.logger,
                Bucket:   v.bucket.Bucket(),
                Prefix:   "trash/",
                PageSize: v.IndexPageSize,
@@ -915,12 +918,13 @@ func (v *S3Volume) EmptyTrash() {
        wg.Wait()
 
        if err := trashL.Error(); err != nil {
-               log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+               v.logger.WithError(err).Error("EmptyTrash: lister failed")
        }
-       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+       v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type s3Lister struct {
+       Logger     logrus.FieldLogger
        Bucket     *s3.Bucket
        Prefix     string
        PageSize   int
@@ -967,7 +971,7 @@ func (lister *s3Lister) getPage() {
        lister.buf = make([]s3.Key, 0, len(resp.Contents))
        for _, key := range resp.Contents {
                if !strings.HasPrefix(key.Key, lister.Prefix) {
-                       log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+                       lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
                        continue
                }
                lister.buf = append(lister.buf, key)
index 49ea24aa03b8cee1903a2de01010b061085b4528..dbd6a45ed9629c14346ecacca7adb4cff001e809 100644 (file)
@@ -11,7 +11,6 @@ import (
        "encoding/json"
        "fmt"
        "io"
-       "log"
        "net/http"
        "net/http/httptest"
        "os"
@@ -499,11 +498,11 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
        err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               log.Printf("PutRaw: %s: %+v", loc, err)
+               v.logger.Printf("PutRaw: %s: %+v", loc, err)
        }
        err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
        if err != nil {
-               log.Printf("PutRaw: recent/%s: %+v", loc, err)
+               v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
        }
 }
 
index ba1455ac657bca2f05dd808a23936af83a425e20..3b1bd042305646e766a2d5a128b79e65816e0eab 100644 (file)
@@ -6,10 +6,10 @@ package main
 
 import (
        "errors"
-       "log"
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "github.com/sirupsen/logrus"
 )
 
 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
@@ -18,19 +18,19 @@ import (
 //      Delete the block indicated by the trash request Locator
 //             Repeat
 //
-func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
+func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
        for item := range trashq.NextItem {
                trashRequest := item.(TrashRequest)
-               TrashItem(volmgr, cluster, trashRequest)
+               TrashItem(volmgr, logger, cluster, trashRequest)
                trashq.DoneItem <- struct{}{}
        }
 }
 
 // TrashItem deletes the indicated block from every writable volume.
-func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
        reqMtime := time.Unix(0, trashRequest.BlockMtime)
        if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
-               log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+               logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
                        arvados.Duration(time.Since(reqMtime)),
                        trashRequest.Locator,
                        trashRequest.BlockMtime,
@@ -43,7 +43,7 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
        if uuid := trashRequest.MountUUID; uuid == "" {
                volumes = volmgr.AllWritable()
        } else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
-               log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+               logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
                return
        } else {
                volumes = []*VolumeMount{mnt}
@@ -52,11 +52,11 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
        for _, volume := range volumes {
                mtime, err := volume.Mtime(trashRequest.Locator)
                if err != nil {
-                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+                       logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
                        continue
                }
                if trashRequest.BlockMtime != mtime.UnixNano() {
-                       log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+                       logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
                        continue
                }
 
@@ -67,9 +67,9 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
                }
 
                if err != nil {
-                       log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+                       logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
                } else {
-                       log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
+                       logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
                }
        }
 }
index bd3743090ab90f1c4dfd6434136cd1776a94ea22..c2052077fedeacf82754ce426297185ae917bd00 100644 (file)
@@ -9,6 +9,7 @@ import (
        "context"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        check "gopkg.in/check.v1"
 )
@@ -291,7 +292,7 @@ func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTe
                        }
                }
        }
-       go RunTrashWorker(s.handler.volmgr, s.cluster, trashq)
+       go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
 
        // Install gate so all local operations block until we say go
        gate := make(chan struct{})
index 6504f9c16b166cf7d5222f59988939beff878802..f41bd30d3d10045d715786bcb6c84c68356c3afd 100644 (file)
@@ -11,7 +11,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
        "os/exec"
        "path/filepath"
@@ -38,6 +37,7 @@ func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger
        if err != nil {
                return nil, err
        }
+       v.logger = v.logger.WithField("Volume", v.String())
        return v, v.check()
 }
 
@@ -84,7 +84,7 @@ type UnixVolume struct {
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
 func (v *UnixVolume) GetDeviceID() string {
        giveup := func(f string, args ...interface{}) string {
-               log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+               v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
                return ""
        }
        buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
@@ -143,7 +143,7 @@ func (v *UnixVolume) GetDeviceID() string {
                link := filepath.Join(udir, uuid)
                fi, err = os.Stat(link)
                if err != nil {
-                       log.Printf("error: stat %q: %s", link, err)
+                       v.logger.WithError(err).Errorf("stat(%q) failed", link)
                        continue
                }
                if fi.Sys().(*syscall.Stat_t).Ino == ino {
@@ -271,15 +271,12 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
        }
        bdir := v.blockDir(loc)
        if err := os.MkdirAll(bdir, 0755); err != nil {
-               log.Printf("%s: could not create directory %s: %s",
-                       loc, bdir, err)
-               return err
+               return fmt.Errorf("error creating directory %s: %s", bdir, err)
        }
 
        tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
        if tmperr != nil {
-               log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
-               return tmperr
+               return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr)
        }
 
        bpath := v.blockPath(loc)
@@ -291,19 +288,20 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
        n, err := io.Copy(tmpfile, rdr)
        v.os.stats.TickOutBytes(uint64(n))
        if err != nil {
-               log.Printf("%s: writing to %s: %s", v, bpath, err)
+               err = fmt.Errorf("error writing %s: %s", bpath, err)
                tmpfile.Close()
                v.os.Remove(tmpfile.Name())
                return err
        }
        if err := tmpfile.Close(); err != nil {
-               log.Printf("closing %s: %s", tmpfile.Name(), err)
+               err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err)
                v.os.Remove(tmpfile.Name())
                return err
        }
        if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
-               log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err)
-               return v.os.Remove(tmpfile.Name())
+               err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err)
+               v.os.Remove(tmpfile.Name())
+               return err
        }
        return nil
 }
@@ -314,14 +312,14 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 func (v *UnixVolume) Status() *VolumeStatus {
        fi, err := v.os.Stat(v.Root)
        if err != nil {
-               log.Printf("%s: os.Stat: %s", v, err)
+               v.logger.WithError(err).Error("stat failed")
                return nil
        }
        devnum := fi.Sys().(*syscall.Stat_t).Dev
 
        var fs syscall.Statfs_t
        if err := syscall.Statfs(v.Root, &fs); err != nil {
-               log.Printf("%s: statfs: %s", v, err)
+               v.logger.WithError(err).Error("statfs failed")
                return nil
        }
        // These calculations match the way df calculates disk usage:
@@ -380,8 +378,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                blockdirpath := filepath.Join(v.Root, names[0])
                blockdir, err := v.os.Open(blockdirpath)
                if err != nil {
-                       log.Print("Error reading ", blockdirpath, ": ", err)
-                       lastErr = err
+                       v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+                       lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
                        continue
                }
                v.os.stats.TickOps("readdir")
@@ -391,8 +389,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                        if err == io.EOF {
                                break
                        } else if err != nil {
-                               log.Print("Error reading ", blockdirpath, ": ", err)
-                               lastErr = err
+                               v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+                               lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
                                break
                        }
                        name := fileInfo[0].Name()
@@ -408,9 +406,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
                                " ", fileInfo[0].ModTime().UnixNano(),
                                "\n")
                        if err != nil {
-                               log.Print("Error writing : ", err)
-                               lastErr = err
-                               break
+                               blockdir.Close()
+                               return fmt.Errorf("error writing: %s", err)
                        }
                }
                blockdir.Close()
@@ -534,7 +531,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
        if avail, err := v.FreeDiskSpace(); err == nil {
                isFull = avail < MinFreeKilobytes
        } else {
-               log.Printf("%s: FreeDiskSpace: %s", v, err)
+               v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
                isFull = false
        }
 
@@ -584,7 +581,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
        }()
        select {
        case <-ctx.Done():
-               log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0))
+               v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0))
                go func() {
                        <-locked
                        v.locker.Unlock()
@@ -653,7 +650,7 @@ func (v *UnixVolume) EmptyTrash() {
                }
                deadline, err := strconv.ParseInt(matches[2], 10, 64)
                if err != nil {
-                       log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2])
                        return
                }
                atomic.AddInt64(&bytesInTrash, info.Size())
@@ -663,7 +660,7 @@ func (v *UnixVolume) EmptyTrash() {
                }
                err = v.os.Remove(path)
                if err != nil {
-                       log.Printf("EmptyTrash: Remove %v: %v", path, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path)
                        return
                }
                atomic.AddInt64(&bytesDeleted, info.Size())
@@ -688,7 +685,7 @@ func (v *UnixVolume) EmptyTrash() {
 
        err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
                if err != nil {
-                       log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+                       v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path)
                        return nil
                }
                todo <- dirent{path, info}
@@ -698,10 +695,10 @@ func (v *UnixVolume) EmptyTrash() {
        wg.Wait()
 
        if err != nil {
-               log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+               v.logger.WithError(err).Error("EmptyTrash failed")
        }
 
-       log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+       v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type unixStats struct {
index 1ffc46513cb571b94e4953ba231ba62983c99398..664956f7bcaa91c44f306481f17546ab7caab3e1 100644 (file)
@@ -19,6 +19,7 @@ import (
        "time"
 
        "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
@@ -90,6 +91,7 @@ func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Clu
                        Root:    d,
                        locker:  locker,
                        cluster: cluster,
+                       logger:  ctxlog.TestLogger(c),
                        volume:  volume,
                        metrics: metrics,
                },