18863: Move container log cleanup from rake task to controller.
authorTom Clegg <tom@curii.com>
Mon, 17 Oct 2022 18:34:32 +0000 (14:34 -0400)
committerTom Clegg <tom@curii.com>
Mon, 17 Oct 2022 18:34:32 +0000 (14:34 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/controller/dblock/dblock.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/trash.go
sdk/go/arvados/config.go
services/api/lib/tasks/delete_old_container_logs.rake

index 9b5547248e7c8ab84adc245cdd266deb07f8da2f..559b57c8c4d5d4bf3fa0444628d880485f75daa6 100644 (file)
@@ -1071,12 +1071,16 @@ Clusters:
       LocalKeepLogsToContainerLog: none
 
       Logging:
-        # When you run the db:delete_old_container_logs task, it will find
-        # containers that have been finished for at least this many seconds,
+        # Periodically (see SweepInterval) Arvados will check for
+        # containers that have been finished for at least this long,
         # and delete their stdout, stderr, arv-mount, crunch-run, and
         # crunchstat logs from the logs table.
         MaxAge: 720h
 
+        # How often to delete cached log entries for finished
+        # containers (see MaxAge).
+        SweepInterval: 12h
+
         # These two settings control how frequently log events are flushed to the
         # database.  Log lines are buffered until either crunch_log_bytes_per_event
         # has been reached or crunch_log_seconds_between_events has elapsed since
index 1a36822d5b7f91e81c5b0deb167a105a962b3dfb..a46201bb45af793062126689d773be61f9bbe232 100644 (file)
@@ -15,8 +15,9 @@ import (
 )
 
 var (
-       TrashSweep = &DBLocker{key: 10001}
-       retryDelay = 5 * time.Second
+       TrashSweep        = &DBLocker{key: 10001}
+       ContainerLogSweep = &DBLocker{key: 10002}
+       retryDelay        = 5 * time.Second
 )
 
 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
index e9c56db4d4b112b906dbaf36dd21b9a7a1300d98..e1392bef92652bfbf8a61155a062752608809495 100644 (file)
@@ -155,6 +155,7 @@ func (h *Handler) setup() {
        }
 
        go h.trashSweepWorker()
+       go h.containerLogSweepWorker()
 }
 
 var errDBConnection = errors.New("database connection error")
index 127e6c34c6238ca48487f5cbb72ca1107bfed7da..0ffe0255f5c59ca6ae40930bd994badf0672d05e 100644 (file)
@@ -496,6 +496,35 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) {
        }
 }
 
+func (s *HandlerSuite) TestContainerLogSweep(c *check.C) {
+       s.cluster.SystemRootToken = arvadostest.SystemRootToken
+       s.cluster.Containers.Logging.SweepInterval = arvados.Duration(time.Second / 10)
+       s.handler.CheckHealth()
+       ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+       logentry, err := s.handler.federation.LogCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+               "object_uuid": arvadostest.CompletedContainerUUID,
+               "event_type":  "stderr",
+               "properties": map[string]interface{}{
+                       "text": "test trash sweep\n",
+               },
+       }})
+       c.Assert(err, check.IsNil)
+       defer s.handler.federation.LogDelete(ctx, arvados.DeleteOptions{UUID: logentry.UUID})
+       deadline := time.Now().Add(5 * time.Second)
+       for {
+               if time.Now().After(deadline) {
+                       c.Log("timed out")
+                       c.FailNow()
+               }
+               logentries, err := s.handler.federation.LogList(ctx, arvados.ListOptions{Filters: []arvados.Filter{{"uuid", "=", logentry.UUID}}, Limit: -1})
+               c.Assert(err, check.IsNil)
+               if len(logentries.Items) == 0 {
+                       break
+               }
+               time.Sleep(time.Second / 10)
+       }
+}
+
 func (s *HandlerSuite) TestLogActivity(c *check.C) {
        s.cluster.SystemRootToken = arvadostest.SystemRootToken
        s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
index 551b2f92bbde209b984656728388ca48a2b9c294..37c7be485924dda45b3be6bdc48ee0a09a85cb6f 100644 (file)
@@ -5,6 +5,7 @@
 package controller
 
 import (
+       "context"
        "time"
 
        "git.arvados.org/arvados.git/lib/controller/dblock"
@@ -12,22 +13,56 @@ import (
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
-func (h *Handler) trashSweepWorker() {
-       sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
-       logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
+       logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
        ctx := ctxlog.Context(h.BackgroundContext, logger)
-       if sleep <= 0 {
-               logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+       if interval <= 0 {
+               logger.Debugf("interval is %v, not running worker", interval)
                return
        }
-       dblock.TrashSweep.Lock(ctx, h.db)
-       defer dblock.TrashSweep.Unlock()
-       for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
-               dblock.TrashSweep.Check()
-               ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
-               _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+       locker.Lock(ctx, h.db)
+       defer locker.Unlock()
+       for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
+               locker.Check()
+               err := run(ctx)
                if err != nil {
-                       logger.WithError(err).Info("trash sweep failed")
+                       logger.WithError(err).Infof("%s failed", workerName)
                }
        }
 }
+
+func (h *Handler) trashSweepWorker() {
+       h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
+               ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+               _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+               return err
+       })
+}
+
+func (h *Handler) containerLogSweepWorker() {
+       h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+               db, err := h.db(ctx)
+               if err != nil {
+                       return err
+               }
+               res, err := db.ExecContext(ctx, `
+DELETE FROM logs
+ USING containers
+ WHERE logs.object_uuid=containers.uuid
+ AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat')
+ AND containers.log IS NOT NULL
+ AND now() - containers.finished_at > $1::interval`,
+                       h.Cluster.Containers.Logging.MaxAge.String())
+               if err != nil {
+                       return err
+               }
+               logger := ctxlog.FromContext(ctx)
+               rows, err := res.RowsAffected()
+               if err != nil {
+                       logger.WithError(err).Warn("unexpected error from RowsAffected()")
+               } else {
+                       logger.WithField("rows", rows).Info("deleted rows from logs table")
+               }
+               return nil
+       })
+}
index 3c1ef4826f91b4007f03c0cb96324f9ce51fbe3a..8705f286934c2e22a62f7977b8849347374d7a2d 100644 (file)
@@ -467,6 +467,7 @@ type ContainersConfig struct {
        }
        Logging struct {
                MaxAge                       Duration
+               SweepInterval                Duration
                LogBytesPerEvent             int
                LogSecondsBetweenEvents      Duration
                LogThrottlePeriod            Duration
index 7a0ab3826ab1c08beee1361ff81b654b4ccff86d..db1b3667cc0a94a95eebd51b46224daab3981336 100644 (file)
@@ -8,11 +8,9 @@
 # from the logs table.
 
 namespace :db do
-  desc "Remove old container log entries from the logs table"
+  desc "deprecated / no-op"
 
   task delete_old_container_logs: :environment do
-    delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND now() - containers.finished_at > interval '#{Rails.configuration.Containers.Logging.MaxAge.to_i} seconds')"
-
-    ActiveRecord::Base.connection.execute(delete_sql)
+    Rails.logger.info "this db:delete_old_container_logs rake task is no longer used"
   end
 end