Merge branch '21535-multi-wf-delete'
[arvados.git] / lib / controller / trash.go
index 9a7b0814cee7477dc7b506921aa99be0ef34ed77..662ea26751f501178a13243a8abe92caf4476adb 100644 (file)
@@ -20,10 +20,16 @@ func (h *Handler) periodicWorker(workerName string, interval time.Duration, lock
                logger.Debugf("interval is %v, not running worker", interval)
                return
        }
-       locker.Lock(ctx, h.db)
+       if !locker.Lock(ctx, h.dbConnector.GetDB) {
+               // context canceled
+               return
+       }
        defer locker.Unlock()
-       for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
-               locker.Check()
+       for ctxSleep(ctx, interval); ctx.Err() == nil; ctxSleep(ctx, interval) {
+               if !locker.Check() {
+                       // context canceled
+                       return
+               }
                err := run(ctx)
                if err != nil {
                        logger.WithError(err).Infof("%s failed", workerName)
@@ -40,8 +46,20 @@ func (h *Handler) trashSweepWorker() {
 }
 
 func (h *Handler) containerLogSweepWorker() {
-       h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
-               db, err := h.db(ctx)
+       // Since #21611 we don't expect any new log entries, so the
+       // periodic worker only runs once, then becomes a no-op.
+       //
+       // The old Containers.Logging.SweepInterval config is removed.
+       // We use TrashSweepInterval here instead, for testing
+       // reasons: it prevents the default integration-testing
+       // controller service (whose TrashSweepInterval is 0) from
+       // acquiring the dblock.
+       done := false
+       h.periodicWorker("container log sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+               if done {
+                       return nil
+               }
+               db, err := h.dbConnector.GetDB(ctx)
                if err != nil {
                        return err
                }
@@ -50,9 +68,7 @@ DELETE FROM logs
  USING containers
  WHERE logs.object_uuid=containers.uuid
  AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore')
- AND containers.log IS NOT NULL
- AND now() - containers.finished_at > $1::interval`,
-                       h.Cluster.Containers.Logging.MaxAge.String())
+ AND containers.log IS NOT NULL`)
                if err != nil {
                        return err
                }
@@ -62,7 +78,19 @@ DELETE FROM logs
                        logger.WithError(err).Warn("unexpected error from RowsAffected()")
                } else {
                        logger.WithField("rows", rows).Info("deleted rows from logs table")
+                       if rows == 0 {
+                               done = true
+                       }
                }
                return nil
        })
 }
+
+// Sleep for the given duration, but return early if ctx cancels
+// before that.
+func ctxSleep(ctx context.Context, d time.Duration) {
+       select {
+       case <-ctx.Done():
+       case <-time.After(d):
+       }
+}