From fdf081b663b91c1d0af669e0224e67a47b8497a3 Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Mon, 15 Nov 2021 15:21:56 -0500 Subject: [PATCH] 18339: Call trash_sweep periodically from controller. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/controller/auth_test.go | 2 +- lib/controller/cmd.go | 4 +- lib/controller/federation/conn.go | 4 + lib/controller/federation_test.go | 2 +- lib/controller/handler.go | 9 +- lib/controller/handler_test.go | 39 ++++++-- lib/controller/rpc/conn.go | 7 ++ lib/controller/server_test.go | 19 ++-- lib/controller/worker.go | 95 +++++++++++++++++++ sdk/go/arvados/api.go | 2 + sdk/go/arvados/client.go | 2 + .../api/app/controllers/sys_controller.rb | 3 +- services/api/config/routes.rb | 2 +- 13 files changed, 167 insertions(+), 23 deletions(-) create mode 100644 lib/controller/worker.go diff --git a/lib/controller/auth_test.go b/lib/controller/auth_test.go index 1752411467..5d477a7664 100644 --- a/lib/controller/auth_test.go +++ b/lib/controller/auth_test.go @@ -98,7 +98,7 @@ func (s *AuthSuite) SetUpTest(c *check.C) { cluster.Login.OpenIDConnect.AcceptAccessToken = true cluster.Login.OpenIDConnect.AcceptAccessTokenScope = "" - s.testHandler = &Handler{Cluster: cluster} + s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)} s.testServer = newServerFromIntegrationTestEnv(c) s.testServer.Server.BaseContext = func(net.Listener) context.Context { return ctxlog.Context(context.Background(), s.log) diff --git a/lib/controller/cmd.go b/lib/controller/cmd.go index 7ab7f5305b..96972251a3 100644 --- a/lib/controller/cmd.go +++ b/lib/controller/cmd.go @@ -16,6 +16,6 @@ import ( // Command starts a controller service. See cmd/arvados-server/cmd.go var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler) -func newHandler(_ context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler { - return &Handler{Cluster: cluster} +func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler { + return &Handler{Cluster: cluster, BackgroundContext: ctx} } diff --git a/lib/controller/federation/conn.go b/lib/controller/federation/conn.go index d1bf473d76..d4155da10b 100644 --- a/lib/controller/federation/conn.go +++ b/lib/controller/federation/conn.go @@ -525,6 +525,10 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options) } +func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) { + return conn.local.SysTrashSweep(ctx, options) +} + var userAttrsCachedFromLoginCluster = map[string]bool{ "created_at": true, "email": true, diff --git a/lib/controller/federation_test.go b/lib/controller/federation_test.go index 211c761980..eb398695bf 100644 --- a/lib/controller/federation_test.go +++ b/lib/controller/federation_test.go @@ -70,7 +70,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) { cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14) arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/") arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/") - s.testHandler = &Handler{Cluster: cluster} + s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)} s.testServer = newServerFromIntegrationTestEnv(c) s.testServer.Server.BaseContext = func(net.Listener) context.Context { return ctxlog.Context(context.Background(), s.log) diff --git a/lib/controller/handler.go b/lib/controller/handler.go index b51d909110..965ba040ed 100644 --- a/lib/controller/handler.go +++ b/lib/controller/handler.go @@ -32,9 +32,11 @@ import ( ) type Handler struct { - Cluster *arvados.Cluster + Cluster *arvados.Cluster + BackgroundContext context.Context setupOnce sync.Once + federation *federation.Conn handlerStack http.Handler proxy *proxy secureClient *http.Client @@ -103,7 +105,8 @@ func (h *Handler) setup() { healthFuncs := make(map[string]health.Func) oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db) - rtr := router.New(federation.New(h.Cluster, &healthFuncs), router.Config{ + h.federation = federation.New(h.Cluster, &healthFuncs) + rtr := router.New(h.federation, router.Config{ MaxRequestSize: h.Cluster.API.MaxRequestSize, WrapCalls: api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls), }) @@ -152,6 +155,8 @@ func (h *Handler) setup() { h.proxy = &proxy{ Name: "arvados-controller", } + + go h.trashSweepWorker() } var errDBConnection = errors.New("database connection error") diff --git a/lib/controller/handler_test.go b/lib/controller/handler_test.go index f854079f97..a456627c0d 100644 --- a/lib/controller/handler_test.go +++ b/lib/controller/handler_test.go @@ -35,7 +35,7 @@ var _ = check.Suite(&HandlerSuite{}) type HandlerSuite struct { cluster *arvados.Cluster - handler http.Handler + handler *Handler ctx context.Context cancel context.CancelFunc } @@ -51,7 +51,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) { s.cluster.TLS.Insecure = true arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST")) arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/") - s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()) + s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler) } func (s *HandlerSuite) TearDownTest(c *check.C) { @@ -276,7 +276,7 @@ func (s *HandlerSuite) TestLogoutGoogle(c *check.C) { func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) { req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) - user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken) + user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken) c.Assert(err, check.IsNil) c.Check(ok, check.Equals, true) c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID) @@ -287,7 +287,7 @@ func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) { func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) { req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) - user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2) + user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2) c.Assert(err, check.IsNil) c.Check(ok, check.Equals, true) c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID) @@ -319,11 +319,11 @@ func (s *HandlerSuite) TestValidateRemoteToken(c *check.C) { func (s *HandlerSuite) TestCreateAPIToken(c *check.C) { req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil) - auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil) + auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil) c.Assert(err, check.IsNil) c.Check(auth.Scopes, check.DeepEquals, []string{"all"}) - user, ok, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2()) + user, ok, err := s.handler.validateAPItoken(req, auth.TokenV2()) c.Assert(err, check.IsNil) c.Check(ok, check.Equals, true) c.Check(user.Authorization.UUID, check.Equals, auth.UUID) @@ -430,3 +430,30 @@ func (s *HandlerSuite) TestRedactRailsAPIHostFromErrors(c *check.C) { c.Check(jresp.Errors[0], check.Matches, `.*//railsapi\.internal/arvados/v1/collections/.*: 404 Not Found.*`) c.Check(jresp.Errors[0], check.Not(check.Matches), `(?ms).*127.0.0.1.*`) } + +func (s *HandlerSuite) TestTrashSweep(c *check.C) { + s.cluster.SystemRootToken = arvadostest.SystemRootToken + s.cluster.Collections.TrashSweepInterval = arvados.Duration(time.Second / 10) + s.handler.CheckHealth() + ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}}) + coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true}) + c.Assert(err, check.IsNil) + defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID}) + db, err := s.handler.db(s.ctx) + c.Assert(err, check.IsNil) + _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID) + c.Assert(err, check.IsNil) + deadline := time.Now().Add(5 * time.Second) + for { + if time.Now().After(deadline) { + c.Log("timed out") + c.FailNow() + } + updated, err := s.handler.federation.CollectionGet(ctx, arvados.GetOptions{UUID: coll.UUID, IncludeTrash: true}) + c.Assert(err, check.IsNil) + if updated.IsTrashed { + break + } + time.Sleep(time.Second / 10) + } +} diff --git a/lib/controller/rpc/conn.go b/lib/controller/rpc/conn.go index 25f47bc3ba..736ef711e1 100644 --- a/lib/controller/rpc/conn.go +++ b/lib/controller/rpc/conn.go @@ -572,6 +572,13 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti return resp, err } +func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) { + ep := arvados.EndpointSysTrashSweep + var resp struct{} + err := conn.requestAndDecode(ctx, &resp, ep, nil, options) + return resp, err +} + func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) { ep := arvados.EndpointUserCreate var resp arvados.User diff --git a/lib/controller/server_test.go b/lib/controller/server_test.go index b2b3365a20..4f3d4a5683 100644 --- a/lib/controller/server_test.go +++ b/lib/controller/server_test.go @@ -35,11 +35,14 @@ func integrationTestCluster() *arvados.Cluster { // provided by the integration-testing environment. func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server { log := ctxlog.TestLogger(c) - - handler := &Handler{Cluster: &arvados.Cluster{ - ClusterID: "zzzzz", - PostgreSQL: integrationTestCluster().PostgreSQL, - }} + ctx := ctxlog.Context(context.Background(), log) + handler := &Handler{ + Cluster: &arvados.Cluster{ + ClusterID: "zzzzz", + PostgreSQL: integrationTestCluster().PostgreSQL, + }, + BackgroundContext: ctx, + } handler.Cluster.TLS.Insecure = true handler.Cluster.Collections.BlobSigning = true handler.Cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey @@ -49,10 +52,8 @@ func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server { srv := &httpserver.Server{ Server: http.Server{ - BaseContext: func(net.Listener) context.Context { - return ctxlog.Context(context.Background(), log) - }, - Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)), + BaseContext: func(net.Listener) context.Context { return ctx }, + Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)), }, Addr: ":", } diff --git a/lib/controller/worker.go b/lib/controller/worker.go new file mode 100644 index 0000000000..02f3db3302 --- /dev/null +++ b/lib/controller/worker.go @@ -0,0 +1,95 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package controller + +import ( + "context" + "database/sql" + "time" + + "git.arvados.org/arvados.git/sdk/go/auth" + "git.arvados.org/arvados.git/sdk/go/ctxlog" + "github.com/jmoiron/sqlx" +) + +const ( + // lock keys should be added here with explicit values, to + // ensure they do not get accidentally renumbered when a key + // is added or removed. + lockKeyTrashSweep = 10001 +) + +// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for +// a long-running task like "do X every N seconds". +type dbLocker struct { + GetDB func(context.Context) (*sqlx.DB, error) + LockKey int + + conn *sql.Conn // != nil if advisory lock is acquired +} + +// Lock acquires the advisory lock the first time it is +// called. Subsequent calls confirm that the lock is still active +// (i.e., the session is still alive), and re-acquires if needed. +func (dbl *dbLocker) Lock(ctx context.Context) { + logger := ctxlog.FromContext(ctx) + for ; ; time.Sleep(5 * time.Second) { + if dbl.conn == nil { + db, err := dbl.GetDB(ctx) + if err != nil { + logger.WithError(err).Infof("error getting database pool") + continue + } + conn, err := db.Conn(ctx) + if err != nil { + logger.WithError(err).Info("error getting database connection") + continue + } + _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey) + if err != nil { + logger.WithError(err).Info("error getting lock") + conn.Close() + continue + } + dbl.conn = conn + } + err := dbl.conn.PingContext(ctx) + if err != nil { + logger.WithError(err).Info("database connection ping failed") + dbl.conn.Close() + dbl.conn = nil + continue + } + return + } +} + +func (dbl *dbLocker) Unlock() { + if dbl.conn != nil { + dbl.conn.Close() + dbl.conn = nil + } +} + +func (h *Handler) trashSweepWorker() { + sleep := h.Cluster.Collections.TrashSweepInterval.Duration() + logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep") + ctx := ctxlog.Context(h.BackgroundContext, logger) + if sleep <= 0 { + logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep) + return + } + locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep} + locker.Lock(ctx) + defer locker.Unlock() + for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) { + locker.Lock(ctx) + ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}}) + _, err := h.federation.SysTrashSweep(ctx, struct{}{}) + if err != nil { + logger.WithError(err).Info("trash sweep failed") + } + } +} diff --git a/sdk/go/arvados/api.go b/sdk/go/arvados/api.go index 0fdc13d198..d4af0e7a8e 100644 --- a/sdk/go/arvados/api.go +++ b/sdk/go/arvados/api.go @@ -68,6 +68,7 @@ var ( EndpointLinkGet = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""} EndpointLinkList = APIEndpoint{"GET", "arvados/v1/links", ""} EndpointLinkDelete = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""} + EndpointSysTrashSweep = APIEndpoint{"POST", "sys/trash_sweep", ""} EndpointUserActivate = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""} EndpointUserCreate = APIEndpoint{"POST", "arvados/v1/users", "user"} EndpointUserCurrent = APIEndpoint{"GET", "arvados/v1/users/current", ""} @@ -269,6 +270,7 @@ type API interface { SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error) SpecimenList(ctx context.Context, options ListOptions) (SpecimenList, error) SpecimenDelete(ctx context.Context, options DeleteOptions) (Specimen, error) + SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) UserCreate(ctx context.Context, options CreateOptions) (User, error) UserUpdate(ctx context.Context, options UpdateOptions) (User, error) UserMerge(ctx context.Context, options UserMergeOptions) (User, error) diff --git a/sdk/go/arvados/client.go b/sdk/go/arvados/client.go index 13bb3bf80d..5ec828667f 100644 --- a/sdk/go/arvados/client.go +++ b/sdk/go/arvados/client.go @@ -217,6 +217,8 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error { return err } switch { + case resp.StatusCode == http.StatusNoContent: + return nil case resp.StatusCode == http.StatusOK && dst == nil: return nil case resp.StatusCode == http.StatusOK: diff --git a/services/api/app/controllers/sys_controller.rb b/services/api/app/controllers/sys_controller.rb index ecc02e83dc..08a672cc0b 100644 --- a/services/api/app/controllers/sys_controller.rb +++ b/services/api/app/controllers/sys_controller.rb @@ -7,7 +7,7 @@ class SysController < ApplicationController skip_before_action :render_404_if_no_object before_action :admin_required - def sweep_trash + def trash_sweep act_as_system_user do # Sweep trashed collections Collection. @@ -31,6 +31,7 @@ class SysController < ApplicationController # Sweep expired tokens ActiveRecord::Base.connection.execute("DELETE from api_client_authorizations where expires_at <= statement_timestamp()") end + head :no_content end protected diff --git a/services/api/config/routes.rb b/services/api/config/routes.rb index a0c3cafe13..98f5788d65 100644 --- a/services/api/config/routes.rb +++ b/services/api/config/routes.rb @@ -92,7 +92,7 @@ Rails.application.routes.draw do end end - post '/sys/sweep_trash', to: 'sys#sweep_trash' + post '/sys/trash_sweep', to: 'sys#trash_sweep' if Rails.env == 'test' post '/database/reset', to: 'database#reset' -- 2.30.2