18339: Call trash_sweep periodically from controller.
authorTom Clegg <tom@curii.com>
Mon, 15 Nov 2021 20:21:56 +0000 (15:21 -0500)
committerTom Clegg <tom@curii.com>
Mon, 15 Nov 2021 20:21:56 +0000 (15:21 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

13 files changed:
lib/controller/auth_test.go
lib/controller/cmd.go
lib/controller/federation/conn.go
lib/controller/federation_test.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/rpc/conn.go
lib/controller/server_test.go
lib/controller/worker.go [new file with mode: 0644]
sdk/go/arvados/api.go
sdk/go/arvados/client.go
services/api/app/controllers/sys_controller.rb
services/api/config/routes.rb

index 17524114671e840ecdac05e2457d9ebbb96c5635..5d477a7664b7266ec28e7696bd604171b6f7c70c 100644 (file)
@@ -98,7 +98,7 @@ func (s *AuthSuite) SetUpTest(c *check.C) {
        cluster.Login.OpenIDConnect.AcceptAccessToken = true
        cluster.Login.OpenIDConnect.AcceptAccessTokenScope = ""
 
-       s.testHandler = &Handler{Cluster: cluster}
+       s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
        s.testServer = newServerFromIntegrationTestEnv(c)
        s.testServer.Server.BaseContext = func(net.Listener) context.Context {
                return ctxlog.Context(context.Background(), s.log)
index 7ab7f5305b4fe83113d1a47f499f7d3eb8298804..96972251a3d18af5758e37cd7961ed586504a10a 100644 (file)
@@ -16,6 +16,6 @@ import (
 // Command starts a controller service. See cmd/arvados-server/cmd.go
 var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
 
-func newHandler(_ context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
-       return &Handler{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+       return &Handler{Cluster: cluster, BackgroundContext: ctx}
 }
index d1bf473d76856abd59bfb35f069e4f47f498e680..d4155da10beca3fb57f4438ca0a371f15addae1c 100644 (file)
@@ -525,6 +525,10 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti
        return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
 }
 
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+       return conn.local.SysTrashSweep(ctx, options)
+}
+
 var userAttrsCachedFromLoginCluster = map[string]bool{
        "created_at":  true,
        "email":       true,
index 211c7619809ed6a8855248915facef843da55081..eb398695bf0b1e369cdfdc9ca871a77128413b08 100644 (file)
@@ -70,7 +70,7 @@ func (s *FederationSuite) SetUpTest(c *check.C) {
        cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14)
        arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
        arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
-       s.testHandler = &Handler{Cluster: cluster}
+       s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
        s.testServer = newServerFromIntegrationTestEnv(c)
        s.testServer.Server.BaseContext = func(net.Listener) context.Context {
                return ctxlog.Context(context.Background(), s.log)
index b51d909110827bf7d8470120a87f5e29db008a15..965ba040edc8fb5fad02e153d817d5cbb8087152 100644 (file)
@@ -32,9 +32,11 @@ import (
 )
 
 type Handler struct {
-       Cluster *arvados.Cluster
+       Cluster           *arvados.Cluster
+       BackgroundContext context.Context
 
        setupOnce      sync.Once
+       federation     *federation.Conn
        handlerStack   http.Handler
        proxy          *proxy
        secureClient   *http.Client
@@ -103,7 +105,8 @@ func (h *Handler) setup() {
        healthFuncs := make(map[string]health.Func)
 
        oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
-       rtr := router.New(federation.New(h.Cluster, &healthFuncs), router.Config{
+       h.federation = federation.New(h.Cluster, &healthFuncs)
+       rtr := router.New(h.federation, router.Config{
                MaxRequestSize: h.Cluster.API.MaxRequestSize,
                WrapCalls:      api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
        })
@@ -152,6 +155,8 @@ func (h *Handler) setup() {
        h.proxy = &proxy{
                Name: "arvados-controller",
        }
+
+       go h.trashSweepWorker()
 }
 
 var errDBConnection = errors.New("database connection error")
index f854079f97d87376c9d6e3813b10b2872701d0f5..a456627c0d49edb8e65cb3dd717d399589e8c9d2 100644 (file)
@@ -35,7 +35,7 @@ var _ = check.Suite(&HandlerSuite{})
 
 type HandlerSuite struct {
        cluster *arvados.Cluster
-       handler http.Handler
+       handler *Handler
        ctx     context.Context
        cancel  context.CancelFunc
 }
@@ -51,7 +51,7 @@ func (s *HandlerSuite) SetUpTest(c *check.C) {
        s.cluster.TLS.Insecure = true
        arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
        arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
-       s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry())
+       s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler)
 }
 
 func (s *HandlerSuite) TearDownTest(c *check.C) {
@@ -276,7 +276,7 @@ func (s *HandlerSuite) TestLogoutGoogle(c *check.C) {
 
 func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
        req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
-       user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken)
+       user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
        c.Assert(err, check.IsNil)
        c.Check(ok, check.Equals, true)
        c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
@@ -287,7 +287,7 @@ func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
 
 func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
        req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
-       user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2)
+       user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
        c.Assert(err, check.IsNil)
        c.Check(ok, check.Equals, true)
        c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
@@ -319,11 +319,11 @@ func (s *HandlerSuite) TestValidateRemoteToken(c *check.C) {
 
 func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
        req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
-       auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil)
+       auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
        c.Assert(err, check.IsNil)
        c.Check(auth.Scopes, check.DeepEquals, []string{"all"})
 
-       user, ok, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2())
+       user, ok, err := s.handler.validateAPItoken(req, auth.TokenV2())
        c.Assert(err, check.IsNil)
        c.Check(ok, check.Equals, true)
        c.Check(user.Authorization.UUID, check.Equals, auth.UUID)
@@ -430,3 +430,30 @@ func (s *HandlerSuite) TestRedactRailsAPIHostFromErrors(c *check.C) {
        c.Check(jresp.Errors[0], check.Matches, `.*//railsapi\.internal/arvados/v1/collections/.*: 404 Not Found.*`)
        c.Check(jresp.Errors[0], check.Not(check.Matches), `(?ms).*127.0.0.1.*`)
 }
+
+func (s *HandlerSuite) TestTrashSweep(c *check.C) {
+       s.cluster.SystemRootToken = arvadostest.SystemRootToken
+       s.cluster.Collections.TrashSweepInterval = arvados.Duration(time.Second / 10)
+       s.handler.CheckHealth()
+       ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+       coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
+       c.Assert(err, check.IsNil)
+       defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
+       db, err := s.handler.db(s.ctx)
+       c.Assert(err, check.IsNil)
+       _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
+       c.Assert(err, check.IsNil)
+       deadline := time.Now().Add(5 * time.Second)
+       for {
+               if time.Now().After(deadline) {
+                       c.Log("timed out")
+                       c.FailNow()
+               }
+               updated, err := s.handler.federation.CollectionGet(ctx, arvados.GetOptions{UUID: coll.UUID, IncludeTrash: true})
+               c.Assert(err, check.IsNil)
+               if updated.IsTrashed {
+                       break
+               }
+               time.Sleep(time.Second / 10)
+       }
+}
index 25f47bc3bac4f801f2aa33b90e2ab935b0f651f9..736ef711e1e7d06b5023cb08de96239363d56832 100644 (file)
@@ -572,6 +572,13 @@ func (conn *Conn) SpecimenDelete(ctx context.Context, options arvados.DeleteOpti
        return resp, err
 }
 
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+       ep := arvados.EndpointSysTrashSweep
+       var resp struct{}
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       return resp, err
+}
+
 func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
        ep := arvados.EndpointUserCreate
        var resp arvados.User
index b2b3365a2015b2ac899a3b62f45d563042267ac9..4f3d4a56834dad539363be635718ca3b0758d3b5 100644 (file)
@@ -35,11 +35,14 @@ func integrationTestCluster() *arvados.Cluster {
 // provided by the integration-testing environment.
 func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
        log := ctxlog.TestLogger(c)
-
-       handler := &Handler{Cluster: &arvados.Cluster{
-               ClusterID:  "zzzzz",
-               PostgreSQL: integrationTestCluster().PostgreSQL,
-       }}
+       ctx := ctxlog.Context(context.Background(), log)
+       handler := &Handler{
+               Cluster: &arvados.Cluster{
+                       ClusterID:  "zzzzz",
+                       PostgreSQL: integrationTestCluster().PostgreSQL,
+               },
+               BackgroundContext: ctx,
+       }
        handler.Cluster.TLS.Insecure = true
        handler.Cluster.Collections.BlobSigning = true
        handler.Cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
@@ -49,10 +52,8 @@ func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
 
        srv := &httpserver.Server{
                Server: http.Server{
-                       BaseContext: func(net.Listener) context.Context {
-                               return ctxlog.Context(context.Background(), log)
-                       },
-                       Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
+                       BaseContext: func(net.Listener) context.Context { return ctx },
+                       Handler:     httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
                },
                Addr: ":",
        }
diff --git a/lib/controller/worker.go b/lib/controller/worker.go
new file mode 100644 (file)
index 0000000..02f3db3
--- /dev/null
@@ -0,0 +1,95 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+       "context"
+       "database/sql"
+       "time"
+
+       "git.arvados.org/arvados.git/sdk/go/auth"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
+)
+
+const (
+       // lock keys should be added here with explicit values, to
+       // ensure they do not get accidentally renumbered when a key
+       // is added or removed.
+       lockKeyTrashSweep = 10001
+)
+
+// dbLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type dbLocker struct {
+       GetDB   func(context.Context) (*sqlx.DB, error)
+       LockKey int
+
+       conn *sql.Conn // != nil if advisory lock is acquired
+}
+
+// Lock acquires the advisory lock the first time it is
+// called. Subsequent calls confirm that the lock is still active
+// (i.e., the session is still alive), and re-acquires if needed.
+func (dbl *dbLocker) Lock(ctx context.Context) {
+       logger := ctxlog.FromContext(ctx)
+       for ; ; time.Sleep(5 * time.Second) {
+               if dbl.conn == nil {
+                       db, err := dbl.GetDB(ctx)
+                       if err != nil {
+                               logger.WithError(err).Infof("error getting database pool")
+                               continue
+                       }
+                       conn, err := db.Conn(ctx)
+                       if err != nil {
+                               logger.WithError(err).Info("error getting database connection")
+                               continue
+                       }
+                       _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.LockKey)
+                       if err != nil {
+                               logger.WithError(err).Info("error getting lock")
+                               conn.Close()
+                               continue
+                       }
+                       dbl.conn = conn
+               }
+               err := dbl.conn.PingContext(ctx)
+               if err != nil {
+                       logger.WithError(err).Info("database connection ping failed")
+                       dbl.conn.Close()
+                       dbl.conn = nil
+                       continue
+               }
+               return
+       }
+}
+
+func (dbl *dbLocker) Unlock() {
+       if dbl.conn != nil {
+               dbl.conn.Close()
+               dbl.conn = nil
+       }
+}
+
+func (h *Handler) trashSweepWorker() {
+       sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+       logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+       ctx := ctxlog.Context(h.BackgroundContext, logger)
+       if sleep <= 0 {
+               logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+               return
+       }
+       locker := &dbLocker{GetDB: h.db, LockKey: lockKeyTrashSweep}
+       locker.Lock(ctx)
+       defer locker.Unlock()
+       for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+               locker.Lock(ctx)
+               ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+               _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+               if err != nil {
+                       logger.WithError(err).Info("trash sweep failed")
+               }
+       }
+}
index 0fdc13d1985d085c28db23615dd9ce1c673781cd..d4af0e7a8ec28d1d4bb77c502d35336441067b31 100644 (file)
@@ -68,6 +68,7 @@ var (
        EndpointLinkGet                       = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
        EndpointLinkList                      = APIEndpoint{"GET", "arvados/v1/links", ""}
        EndpointLinkDelete                    = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+       EndpointSysTrashSweep                 = APIEndpoint{"POST", "sys/trash_sweep", ""}
        EndpointUserActivate                  = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
        EndpointUserCreate                    = APIEndpoint{"POST", "arvados/v1/users", "user"}
        EndpointUserCurrent                   = APIEndpoint{"GET", "arvados/v1/users/current", ""}
@@ -269,6 +270,7 @@ type API interface {
        SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
        SpecimenList(ctx context.Context, options ListOptions) (SpecimenList, error)
        SpecimenDelete(ctx context.Context, options DeleteOptions) (Specimen, error)
+       SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error)
        UserCreate(ctx context.Context, options CreateOptions) (User, error)
        UserUpdate(ctx context.Context, options UpdateOptions) (User, error)
        UserMerge(ctx context.Context, options UserMergeOptions) (User, error)
index 13bb3bf80de70c11e4567ab69ea56c9c03b28a8f..5ec828667fc940ace2c3f59b6cdc643139ae3b14 100644 (file)
@@ -217,6 +217,8 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
                return err
        }
        switch {
+       case resp.StatusCode == http.StatusNoContent:
+               return nil
        case resp.StatusCode == http.StatusOK && dst == nil:
                return nil
        case resp.StatusCode == http.StatusOK:
index ecc02e83dc2e33115668a6e949737a75632a88a2..08a672cc0be3b48d3ed425abb6005454f217d3b2 100644 (file)
@@ -7,7 +7,7 @@ class SysController < ApplicationController
   skip_before_action :render_404_if_no_object
   before_action :admin_required
 
-  def sweep_trash
+  def trash_sweep
     act_as_system_user do
       # Sweep trashed collections
       Collection.
@@ -31,6 +31,7 @@ class SysController < ApplicationController
       # Sweep expired tokens
       ActiveRecord::Base.connection.execute("DELETE from api_client_authorizations where expires_at <= statement_timestamp()")
     end
+    head :no_content
   end
 
   protected
index a0c3cafe137fe1378e9889e6c16a9adad1fd9749..98f5788d6505d3525115f3b3a8e5622b8a059937 100644 (file)
@@ -92,7 +92,7 @@ Rails.application.routes.draw do
     end
   end
 
-  post '/sys/sweep_trash', to: 'sys#sweep_trash'
+  post '/sys/trash_sweep', to: 'sys#trash_sweep'
 
   if Rails.env == 'test'
     post '/database/reset', to: 'database#reset'