LocalKeepLogsToContainerLog: none
Logging:
- # When you run the db:delete_old_container_logs task, it will find
- # containers that have been finished for at least this many seconds,
+ # Periodically (see SweepInterval) Arvados will check for
+ # containers that have been finished for at least this long,
# and delete their stdout, stderr, arv-mount, crunch-run, and
# crunchstat logs from the logs table.
MaxAge: 720h
+ # How often to delete cached log entries for finished
+ # containers (see MaxAge).
+ SweepInterval: 12h
+
# These two settings control how frequently log events are flushed to the
# database. Log lines are buffered until either crunch_log_bytes_per_event
# has been reached or crunch_log_seconds_between_events has elapsed since
)
var (
- TrashSweep = &DBLocker{key: 10001}
- retryDelay = 5 * time.Second
+ TrashSweep = &DBLocker{key: 10001}
+ ContainerLogSweep = &DBLocker{key: 10002}
+ retryDelay = 5 * time.Second
)
// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
}
go h.trashSweepWorker()
+ go h.containerLogSweepWorker()
}
var errDBConnection = errors.New("database connection error")
}
}
+func (s *HandlerSuite) TestContainerLogSweep(c *check.C) {
+ s.cluster.SystemRootToken = arvadostest.SystemRootToken
+ s.cluster.Containers.Logging.SweepInterval = arvados.Duration(time.Second / 10)
+ s.handler.CheckHealth()
+ ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+ logentry, err := s.handler.federation.LogCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "object_uuid": arvadostest.CompletedContainerUUID,
+ "event_type": "stderr",
+ "properties": map[string]interface{}{
+ "text": "test trash sweep\n",
+ },
+ }})
+ c.Assert(err, check.IsNil)
+ defer s.handler.federation.LogDelete(ctx, arvados.DeleteOptions{UUID: logentry.UUID})
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ c.Log("timed out")
+ c.FailNow()
+ }
+ logentries, err := s.handler.federation.LogList(ctx, arvados.ListOptions{Filters: []arvados.Filter{{"uuid", "=", logentry.UUID}}, Limit: -1})
+ c.Assert(err, check.IsNil)
+ if len(logentries.Items) == 0 {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+}
+
func (s *HandlerSuite) TestLogActivity(c *check.C) {
s.cluster.SystemRootToken = arvadostest.SystemRootToken
s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
package controller
import (
+ "context"
"time"
"git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
)
-func (h *Handler) trashSweepWorker() {
- sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
- logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
ctx := ctxlog.Context(h.BackgroundContext, logger)
- if sleep <= 0 {
- logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ if interval <= 0 {
+ logger.Debugf("interval is %v, not running worker", interval)
return
}
- dblock.TrashSweep.Lock(ctx, h.db)
- defer dblock.TrashSweep.Unlock()
- for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
- dblock.TrashSweep.Check()
- ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
- _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ locker.Lock(ctx, h.db)
+ defer locker.Unlock()
+ for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
+ locker.Check()
+ err := run(ctx)
if err != nil {
- logger.WithError(err).Info("trash sweep failed")
+ logger.WithError(err).Infof("%s failed", workerName)
}
}
}
+
+func (h *Handler) trashSweepWorker() {
+ h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
+ ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ return err
+ })
+}
+
+func (h *Handler) containerLogSweepWorker() {
+ h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+ db, err := h.db(ctx)
+ if err != nil {
+ return err
+ }
+ res, err := db.ExecContext(ctx, `
+DELETE FROM logs
+ USING containers
+ WHERE logs.object_uuid=containers.uuid
+ AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat')
+ AND containers.log IS NOT NULL
+ AND now() - containers.finished_at > $1::interval`,
+ h.Cluster.Containers.Logging.MaxAge.String())
+ if err != nil {
+ return err
+ }
+ logger := ctxlog.FromContext(ctx)
+ rows, err := res.RowsAffected()
+ if err != nil {
+ logger.WithError(err).Warn("unexpected error from RowsAffected()")
+ } else {
+ logger.WithField("rows", rows).Info("deleted rows from logs table")
+ }
+ return nil
+ })
+}
}
Logging struct {
MaxAge Duration
+ SweepInterval Duration
LogBytesPerEvent int
LogSecondsBetweenEvents Duration
LogThrottlePeriod Duration
# from the logs table.
namespace :db do
- desc "Remove old container log entries from the logs table"
+ desc "deprecated / no-op"
task delete_old_container_logs: :environment do
- delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND now() - containers.finished_at > interval '#{Rails.configuration.Containers.Logging.MaxAge.to_i} seconds')"
-
- ActiveRecord::Base.connection.execute(delete_sql)
+ Rails.logger.info "this db:delete_old_container_logs rake task is no longer used"
end
end