Merge branch 'main' into 19582-aws-s3v2-driver 19582-aws-s3v2-driver
authorLucas Di Pentima <lucas.dipentima@curii.com>
Mon, 31 Oct 2022 19:22:07 +0000 (16:22 -0300)
committerLucas Di Pentima <lucas.dipentima@curii.com>
Mon, 31 Oct 2022 19:53:13 +0000 (16:53 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas.dipentima@curii.com>

56 files changed:
apps/workbench/Gemfile.lock
doc/_config.yml
doc/admin/upgrading.html.textile.liquid
doc/api/methods/groups.html.textile.liquid
doc/api/projects.html.textile.liquid
doc/api/properties.html.textile.liquid [new file with mode: 0644]
lib/config/config.default.yml
lib/config/export.go
lib/controller/dblock/dblock.go
lib/controller/dblock/dblock_test.go [new file with mode: 0644]
lib/controller/federation.go
lib/controller/federation/conn.go
lib/controller/federation/generate.go
lib/controller/federation/generated.go
lib/controller/handler.go
lib/controller/handler_test.go
lib/controller/router/router.go
lib/controller/rpc/conn.go
lib/controller/trash.go
lib/crunchrun/crunchrun.go
lib/crunchstat/crunchstat.go
lib/crunchstat/crunchstat_test.go
lib/ctrlctx/db.go
lib/diagnostics/cmd.go
lib/dispatchcloud/dispatcher.go
lib/dispatchcloud/dispatcher_test.go
lib/lsf/dispatch.go
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/tests/test_container.py
sdk/go/arvados/api.go
sdk/go/arvados/client.go
sdk/go/arvados/config.go
sdk/go/arvados/duration.go
sdk/go/arvados/duration_test.go
sdk/go/arvados/fs_collection.go
sdk/go/arvados/log.go
sdk/go/arvados/vocabulary.go
sdk/go/arvados/vocabulary_test.go
sdk/go/arvadostest/api.go
sdk/go/health/aggregator.go
sdk/go/health/aggregator_test.go
sdk/python/tests/run_test_server.py
services/api/app/models/user.rb
services/api/lib/tasks/delete_old_container_logs.rake
services/api/test/integration/users_test.rb
services/api/test/tasks/delete_old_container_logs_test.rb [deleted file]
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunchstat/crunchstat.go
services/keep-balance/balance.go
services/keep-balance/balance_run_test.go
services/keep-balance/integration_test.go
services/keep-balance/main.go
services/keep-balance/server.go
tools/salt-install/config_examples/multi_host/aws/pillars/nginx_controller_configuration.sls
tools/salt-install/config_examples/single_host/multiple_hostnames/pillars/nginx_controller_configuration.sls
tools/salt-install/config_examples/single_host/single_hostname/pillars/nginx_controller_configuration.sls

index a70add7affbafd5364b1bc86149a26af4860c893..c46568b726130fcb8978eebcb3bc5d7e20a589d7 100644 (file)
@@ -179,7 +179,7 @@ GEM
     net-ssh-gateway (2.0.0)
       net-ssh (>= 4.0.0)
     nio4r (2.5.8)
-    nokogiri (1.13.7)
+    nokogiri (1.13.9)
       mini_portile2 (~> 2.8.0)
       racc (~> 1.4)
     npm-rails (0.2.1)
index a6f7e608639309d23fe6f81da82f8b17474f0333..aac4256b175d7d6e2d94f28f9a4b9baf4834917d 100644 (file)
@@ -132,6 +132,7 @@ navbar:
       - api/keep-s3.html.textile.liquid
       - api/keep-web-urls.html.textile.liquid
       - api/projects.html.textile.liquid
+      - api/properties.html.textile.liquid
       - api/methods/collections.html.textile.liquid
       - api/methods/repositories.html.textile.liquid
     - Container engine:
index 40f7034c3238d1130f831e209f02732e72d77b4d..5fbbda2aef1ac171eb1d743327be2ef33c7561c4 100644 (file)
@@ -29,13 +29,22 @@ TODO: extract this information based on git commit messages and generate changel
 </notextile>
 
 
-h2(#main). development main (as of 2022-10-13)
+h2(#main). development main (as of 2022-10-31)
 
 "previous: Upgrading to 2.4.3":#v2_4_3
 
 h3. New keepstore S3 driver enabled by default
 
 A more actively maintained S3 client library is now enabled by default for keeepstore services. The previous driver is still available for use in case of unknown issues. To use the old driver, set @DriverParameters.UseAWSS3v2Driver@ to @false@ on the appropriate @Volumes@ config entries.
+h2(#main). development main (as of 2022-10-14)
+
+h3. Old container logs are automatically deleted from PostgreSQL
+
+Cached copies of log entries from containers that finished more than 1 month ago are now deleted automatically (this only affects the "live" logs saved in the PostgreSQL database, not log collections saved in Keep). If you have an existing cron job that runs @rake db:delete_old_container_logs@, you can remove it. See configuration options @Containers.Logging.MaxAge@ and @Containers.Logging.SweepInterval@.
+
+h3. Fixed salt installer template file to support container shell access
+
+If you manage your cluster using the salt installer, you may want to update it to the latest version, use the appropriate @config_examples@ subdirectory and re-reploy with your custom @local.params@ file so that the @arvados-controller@'s @nginx@ configuration file gets fixed.
 
 h3. Login-sync script requires configuration update on LoginCluster federations
 
index db0aac3c7a3570fd0b3b5e9aa2c74dbe9874c196..72fca48560e9398dc7bf62183f320fcbfd1f9ea8 100644 (file)
@@ -38,7 +38,7 @@ table(table table-bordered table-condensed).
 |is_trashed|datetime|True if @trash_at@ is in the past, false if not.||
 |frozen_by_uuid|string|For a frozen project, indicates the user who froze the project; null in all other cases. When a project is frozen, no further changes can be made to the project or its contents, even by admins. Attempting to add new items or modify, rename, move, trash, or delete the project or its contents, including any subprojects, will return an error.||
 
-h3. Frozen projects
+h3(#frozen). Frozen projects
 
 A user with @manage@ permission can set the @frozen_by_uuid@ attribute of a @project@ group to their own user UUID. Once this is done, no further changes can be made to the project or its contents, including subprojects.
 
index 9aa3d85d4d5297adfc91d396a9f8b518d9ff831e..5cb630c43454c24582cb540077051b3b7e847f43 100644 (file)
@@ -27,7 +27,7 @@ In this command, `zzzzz-tpzed-123456789012345` is a @user@ uuid, which is unusua
 
 Because the home project is a virtual project, other operations via the @groups@ API are not supported.
 
-h2. Filter groups
+h2(#filtergroups). Filter groups
 
 Filter groups are another type of virtual project. They are implemented as an Arvados @group@ object with @group_class@ set to the value "filter".
 
diff --git a/doc/api/properties.html.textile.liquid b/doc/api/properties.html.textile.liquid
new file mode 100644 (file)
index 0000000..bf4b05c
--- /dev/null
@@ -0,0 +1,50 @@
+---
+layout: default
+navsection: api
+title: "Metadata properties"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados allows you to attach arbitrary properties to "collection":methods/collections.html, "container_request":methods/container_requests.html, "link":methods/links.html and "group":methods/groups.html records that have a @properties@ field.  These are key-value pairs, where the value is a valid JSON type (string, number, null, boolean, array, object).
+
+Searching for records using properties is described in "Filtering on subproperties":methods.html#subpropertyfilters .
+
+h2. Reserved properties
+
+The following properties are set by Arvados components.
+
+table(table table-bordered table-condensed).
+|_. Property name|_. Appears on|_. Value type|_.Description|
+|type|collection|string|Appears on collections to indicates the contents or usage. See "Collection type values":#collectiontype below for details.|
+|container_request|collection|string|The UUID of the container request that produced an output or log collection.|
+|docker-image-repo-tag|collection|string|For collections containing a Docker image, the repo/name:tag identifier|
+|container_uuid|collection|string|The UUID of the container that produced a collection (set on collections with type=log)|
+|cwl_input|container_request|object|On an intermediate container request, the CWL workflow-level input parameters used to generate the container request|
+|cwl_output|container_request|object|On an intermediate container request, the CWL workflow-level output parameters collected from the container request|
+|template_uuid|container_request|string|For a workflow runner container request, the workflow record that was used to launch it.|
+|username|link|string|For a "can_login":permission-model.html#links permission link, the unix username on the VM that the user will have.|
+|groups|link|array of string|For a "can_login":permission-model.html#links permission link, the unix groups on the VM that the user will be added to.|
+|image_timestamp|link|string|When resolving a Docker image name and multiple links are found with @link_class=docker_image_repo+tag@ and same @link_name@, the @image_timestamp@ is used to determine precedence (most recent wins).|
+|filters|group|array of array of string|Used to define "filter groups":projects.html#filtergroup|
+
+h3(#collectiontype). Collection "type" values
+
+Meaningful values of the @type@ property.  These are recognized by Workbench when filtering on types of collections from the project content listing.
+
+table(table table-bordered table-condensed).
+|_. Type|_.Description|
+|log|The collection contains log files from a container run.|
+|output|The collection contains the output of a top-level container run (this is a container request where @requesting_container_uuid@  is null).|
+|intermediate|The collection contains the output of a child container run (this is a container request where @requesting_container_uuid@ is non-empty).|
+
+h2. Controlling user-supplied properties
+
+Arvados can be configured with a vocabulary file that lists valid properties and the range of valid values for those properties.  This is described in "Metadata vocabulary":{{site.baseurl}}/admin/metadata-vocabulary.html .
+
+Arvados offers options to set properties automatically and/or prevent certain properties, once set, from being changed by non-admin users.  This is described in "Configuring collection's managed properties":{{site.baseurl}}/admin/collection-managed-properties.html .
+
+The admin can require that certain properties must be non-empty before "freezing a project":methods/groups.html#frozen .
index ca54840fd4095c6247fdf3999c23bd9121565beb..5e46c290da6424f15870ac4bb6cf03731e21e338 100644 (file)
@@ -1071,12 +1071,16 @@ Clusters:
       LocalKeepLogsToContainerLog: none
 
       Logging:
-        # When you run the db:delete_old_container_logs task, it will find
-        # containers that have been finished for at least this many seconds,
+        # Periodically (see SweepInterval) Arvados will check for
+        # containers that have been finished for at least this long,
         # and delete their stdout, stderr, arv-mount, crunch-run, and
         # crunchstat logs from the logs table.
         MaxAge: 720h
 
+        # How often to delete cached log entries for finished
+        # containers (see MaxAge).
+        SweepInterval: 12h
+
         # These two settings control how frequently log events are flushed to the
         # database.  Log lines are buffered until either crunch_log_bytes_per_event
         # has been reached or crunch_log_seconds_between_events has elapsed since
@@ -1716,6 +1720,10 @@ Clusters:
       # This feature is disabled when set to zero.
       IdleTimeout: 0s
 
+      # URL to a file that is a fragment of text or HTML which should
+      # be rendered in Workbench as a banner.
+      BannerURL: ""
+
       # Workbench welcome screen, this is HTML text that will be
       # incorporated directly onto the page.
       WelcomePageHTML: |
index fb17a45c84a82fe4568777065fe23684c49a4603..e7cf094eb02b63f8a613b881696940e2cec505ab 100644 (file)
@@ -291,6 +291,7 @@ var whitelist = map[string]bool{
        "Workbench.UserProfileFormFields.*.*.*":               true,
        "Workbench.UserProfileFormMessage":                    true,
        "Workbench.WelcomePageHTML":                           true,
+       "Workbench.BannerURL":                                 true,
 }
 
 func redactUnsafe(m map[string]interface{}, mPrefix, lookupPrefix string) error {
index 1a36822d5b7f91e81c5b0deb167a105a962b3dfb..ad2733abfa36df82c72c4aa3c7a6c090c6496efb 100644 (file)
@@ -7,6 +7,8 @@ package dblock
 import (
        "context"
        "database/sql"
+       "fmt"
+       "net"
        "sync"
        "time"
 
@@ -15,8 +17,12 @@ import (
 )
 
 var (
-       TrashSweep = &DBLocker{key: 10001}
-       retryDelay = 5 * time.Second
+       TrashSweep         = &DBLocker{key: 10001}
+       ContainerLogSweep  = &DBLocker{key: 10002}
+       KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+       KeepBalanceActive  = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+       Dispatch           = &DBLocker{key: 10005} // any dispatcher running
+       retryDelay         = 5 * time.Second
 )
 
 // DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
@@ -30,8 +36,11 @@ type DBLocker struct {
 }
 
 // Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
-       logger := ctxlog.FromContext(ctx)
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
+       logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+       var lastHeldBy string
        for ; ; time.Sleep(retryDelay) {
                dbl.mtx.Lock()
                if dbl.conn != nil {
@@ -40,55 +49,87 @@ func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sql
                        dbl.mtx.Unlock()
                        continue
                }
+               if ctx.Err() != nil {
+                       dbl.mtx.Unlock()
+                       return false
+               }
                db, err := getdb(ctx)
-               if err != nil {
-                       logger.WithError(err).Infof("error getting database pool")
+               if err == context.Canceled {
+                       dbl.mtx.Unlock()
+                       return false
+               } else if err != nil {
+                       logger.WithError(err).Info("error getting database pool")
                        dbl.mtx.Unlock()
                        continue
                }
                conn, err := db.Conn(ctx)
-               if err != nil {
+               if err == context.Canceled {
+                       dbl.mtx.Unlock()
+                       return false
+               } else if err != nil {
                        logger.WithError(err).Info("error getting database connection")
                        dbl.mtx.Unlock()
                        continue
                }
                var locked bool
                err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
-               if err != nil {
-                       logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+               if err == context.Canceled {
+                       return false
+               } else if err != nil {
+                       logger.WithError(err).Info("error getting pg_try_advisory_lock")
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
                if !locked {
+                       var host string
+                       var port int
+                       err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+                               (SELECT pid FROM pg_locks
+                                WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+                       if err != nil {
+                               logger.WithError(err).Info("error getting other client info")
+                       } else {
+                               heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+                               if lastHeldBy != heldBy {
+                                       logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+                                       lastHeldBy = heldBy
+                               }
+                       }
                        conn.Close()
                        dbl.mtx.Unlock()
                        continue
                }
-               logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+               logger.Debug("acquired pg_advisory_lock")
                dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
                dbl.mtx.Unlock()
-               return
+               return true
        }
 }
 
 // Check confirms that the lock is still active (i.e., the session is
 // still alive), and re-acquires if needed. Panics if Lock is not
 // acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
        dbl.mtx.Lock()
        err := dbl.conn.PingContext(dbl.ctx)
-       if err == nil {
-               ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+       if err == context.Canceled {
+               dbl.mtx.Unlock()
+               return false
+       } else if err == nil {
+               ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
                dbl.mtx.Unlock()
-               return
+               return true
        }
        ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
        dbl.conn.Close()
        dbl.conn = nil
        ctx, getdb := dbl.ctx, dbl.getdb
        dbl.mtx.Unlock()
-       dbl.Lock(ctx, getdb)
+       return dbl.Lock(ctx, getdb)
 }
 
 func (dbl *DBLocker) Unlock() {
@@ -97,9 +138,9 @@ func (dbl *DBLocker) Unlock() {
        if dbl.conn != nil {
                _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
                if err != nil {
-                       ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
                } else {
-                       ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+                       ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
                }
                dbl.conn.Close()
                dbl.conn = nil
diff --git a/lib/controller/dblock/dblock_test.go b/lib/controller/dblock/dblock_test.go
new file mode 100644 (file)
index 0000000..b10b2a3
--- /dev/null
@@ -0,0 +1,91 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+       "bytes"
+       "context"
+       "sync"
+       "testing"
+       "time"
+
+       "git.arvados.org/arvados.git/lib/config"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
+       "github.com/jmoiron/sqlx"
+       "github.com/sirupsen/logrus"
+       check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+       cluster *arvados.Cluster
+       db      *sqlx.DB
+       getdb   func(context.Context) (*sqlx.DB, error)
+}
+
+var testLocker = &DBLocker{key: 999}
+
+func (s *suite) SetUpSuite(c *check.C) {
+       cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+       c.Assert(err, check.IsNil)
+       s.cluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+       s.db = arvadostest.DB(c, s.cluster)
+       s.getdb = func(context.Context) (*sqlx.DB, error) { return s.db, nil }
+}
+
+func (s *suite) TestLock(c *check.C) {
+       retryDelay = 10 * time.Millisecond
+
+       var logbuf bytes.Buffer
+       logger := ctxlog.New(&logbuf, "text", "debug")
+       logger.Level = logrus.DebugLevel
+       ctx := ctxlog.Context(context.Background(), logger)
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
+       testLocker.Lock(ctx, s.getdb)
+       testLocker.Check()
+
+       lock2 := make(chan bool)
+       var wg sync.WaitGroup
+       defer wg.Wait()
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               testLocker2 := &DBLocker{key: 999}
+               testLocker2.Lock(ctx, s.getdb)
+               close(lock2)
+               testLocker2.Check()
+               testLocker2.Unlock()
+       }()
+
+       // Second lock should wait for first to Unlock
+       select {
+       case <-time.After(time.Second / 10):
+               c.Check(logbuf.String(), check.Matches, `(?ms).*level=info.*DBClient="[^"]+:\d+".*ID=999.*`)
+       case <-lock2:
+               c.Log("double-lock")
+               c.Fail()
+       }
+
+       testLocker.Check()
+       testLocker.Unlock()
+
+       // Now the second lock should succeed within retryDelay
+       select {
+       case <-time.After(retryDelay * 2):
+               c.Log("timed out")
+               c.Fail()
+       case <-lock2:
+       }
+       c.Logf("%s", logbuf.String())
+}
index e7d6e29b88c1f683f981a1ee5df2b53cf7c862af..93b8315a63be588a0b3e2e1b3182337e68defeff 100644 (file)
@@ -142,7 +142,7 @@ type CurrentUser struct {
 // non-nil, true, nil -- if the token is valid
 func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
        user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
-       db, err := h.db(req.Context())
+       db, err := h.dbConnector.GetDB(req.Context())
        if err != nil {
                ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
                return nil, false, err
@@ -179,7 +179,7 @@ func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUse
 }
 
 func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
-       db, err := h.db(req.Context())
+       db, err := h.dbConnector.GetDB(req.Context())
        if err != nil {
                return nil, err
        }
index 89f68a5ef1848aab0579ace235a60c92a3c05879..03690af0264001ba37153ac88875837d6031c378 100644 (file)
@@ -515,6 +515,26 @@ func (conn *Conn) LinkDelete(ctx context.Context, options arvados.DeleteOptions)
        return conn.chooseBackend(options.UUID).LinkDelete(ctx, options)
 }
 
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+       return conn.chooseBackend(options.ClusterID).LogCreate(ctx, options)
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+       return conn.chooseBackend(options.UUID).LogUpdate(ctx, options)
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+       return conn.chooseBackend(options.UUID).LogGet(ctx, options)
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+       return conn.generated_LogList(ctx, options)
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+       return conn.chooseBackend(options.UUID).LogDelete(ctx, options)
+}
+
 func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
        return conn.generated_SpecimenList(ctx, options)
 }
index 8af61315643708aaa96466286275452d3c242edb..86bbf9d9e3fcd991b0020f0a2332a25eb16c9108 100644 (file)
@@ -53,7 +53,7 @@ func main() {
                defer out.Close()
                out.Write(regexp.MustCompile(`(?ms)^.*package .*?import.*?\n\)\n`).Find(buf))
                io.WriteString(out, "//\n// -- this file is auto-generated -- do not edit -- edit list.go and run \"go generate\" instead --\n//\n\n")
-               for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "APIClientAuthorization"} {
+               for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "Log", "APIClientAuthorization"} {
                        _, err := out.Write(bytes.ReplaceAll(orig, []byte("Collection"), []byte(t)))
                        if err != nil {
                                panic(err)
index 66f36161d50817743ba25853fe1aadd637f84bdc..637a1ce9194953aeff865a0cd3f86dad13ba1068 100755 (executable)
@@ -263,6 +263,47 @@ func (conn *Conn) generated_LinkList(ctx context.Context, options arvados.ListOp
        return merged, err
 }
 
+func (conn *Conn) generated_LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+       var mtx sync.Mutex
+       var merged arvados.LogList
+       var needSort atomic.Value
+       needSort.Store(false)
+       err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+               options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
+               cl, err := backend.LogList(ctx, options)
+               if err != nil {
+                       return nil, err
+               }
+               mtx.Lock()
+               defer mtx.Unlock()
+               if len(merged.Items) == 0 {
+                       merged = cl
+               } else if len(cl.Items) > 0 {
+                       merged.Items = append(merged.Items, cl.Items...)
+                       needSort.Store(true)
+               }
+               uuids := make([]string, 0, len(cl.Items))
+               for _, item := range cl.Items {
+                       uuids = append(uuids, item.UUID)
+               }
+               return uuids, nil
+       })
+       if needSort.Load().(bool) {
+               // Apply the default/implied order, "modified_at desc"
+               sort.Slice(merged.Items, func(i, j int) bool {
+                       mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
+                       return mj.Before(mi)
+               })
+       }
+       if merged.Items == nil {
+               // Return empty results as [], not null
+               // (https://github.com/golang/go/issues/27589 might be
+               // a better solution in the future)
+               merged.Items = []arvados.Log{}
+       }
+       return merged, err
+}
+
 func (conn *Conn) generated_APIClientAuthorizationList(ctx context.Context, options arvados.ListOptions) (arvados.APIClientAuthorizationList, error) {
        var mtx sync.Mutex
        var merged arvados.APIClientAuthorizationList
index e9c56db4d4b112b906dbaf36dd21b9a7a1300d98..4c6fca7f77276c3981c591d18429d8520d3e76b7 100644 (file)
@@ -6,7 +6,6 @@ package controller
 
 import (
        "context"
-       "errors"
        "fmt"
        "net/http"
        "net/http/httptest"
@@ -21,10 +20,8 @@ import (
        "git.arvados.org/arvados.git/lib/controller/router"
        "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/sdk/go/arvados"
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/health"
        "git.arvados.org/arvados.git/sdk/go/httpserver"
-       "github.com/jmoiron/sqlx"
 
        // sqlx needs lib/pq to talk to PostgreSQL
        _ "github.com/lib/pq"
@@ -40,8 +37,7 @@ type Handler struct {
        proxy          *proxy
        secureClient   *http.Client
        insecureClient *http.Client
-       pgdb           *sqlx.DB
-       pgdbMtx        sync.Mutex
+       dbConnector    ctrlctx.DBConnector
 }
 
 func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
@@ -65,7 +61,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 
 func (h *Handler) CheckHealth() error {
        h.setupOnce.Do(h.setup)
-       _, err := h.db(context.TODO())
+       _, err := h.dbConnector.GetDB(context.TODO())
        if err != nil {
                return err
        }
@@ -97,17 +93,18 @@ func (h *Handler) setup() {
        mux := http.NewServeMux()
        healthFuncs := make(map[string]health.Func)
 
-       oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
+       h.dbConnector = ctrlctx.DBConnector{PostgreSQL: h.Cluster.PostgreSQL}
+       oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.dbConnector.GetDB)
        h.federation = federation.New(h.Cluster, &healthFuncs)
        rtr := router.New(h.federation, router.Config{
                MaxRequestSize: h.Cluster.API.MaxRequestSize,
                WrapCalls: api.ComposeWrappers(
-                       ctrlctx.WrapCallsInTransactions(h.db),
+                       ctrlctx.WrapCallsInTransactions(h.dbConnector.GetDB),
                        oidcAuthorizer.WrapCalls,
                        ctrlctx.WrapCallsWithAuth(h.Cluster)),
        })
 
-       healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
+       healthRoutes := health.Routes{"ping": func() error { _, err := h.dbConnector.GetDB(context.TODO()); return err }}
        for name, f := range healthFuncs {
                healthRoutes[name] = f
        }
@@ -155,31 +152,7 @@ func (h *Handler) setup() {
        }
 
        go h.trashSweepWorker()
-}
-
-var errDBConnection = errors.New("database connection error")
-
-func (h *Handler) db(ctx context.Context) (*sqlx.DB, error) {
-       h.pgdbMtx.Lock()
-       defer h.pgdbMtx.Unlock()
-       if h.pgdb != nil {
-               return h.pgdb, nil
-       }
-
-       db, err := sqlx.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
-       if err != nil {
-               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
-               return nil, errDBConnection
-       }
-       if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
-               db.SetMaxOpenConns(p)
-       }
-       if err := db.Ping(); err != nil {
-               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
-               return nil, errDBConnection
-       }
-       h.pgdb = db
-       return db, nil
+       go h.containerLogSweepWorker()
 }
 
 type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
index 127e6c34c6238ca48487f5cbb72ca1107bfed7da..a240b195ea9ff583eede71720ab8c9dfdf3c3798 100644 (file)
@@ -284,6 +284,7 @@ func (s *HandlerSuite) TestLogoutGoogle(c *check.C) {
 }
 
 func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
+       c.Assert(s.handler.CheckHealth(), check.IsNil)
        req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
        user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
        c.Assert(err, check.IsNil)
@@ -295,6 +296,7 @@ func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
 }
 
 func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
+       c.Assert(s.handler.CheckHealth(), check.IsNil)
        req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
        user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
        c.Assert(err, check.IsNil)
@@ -337,6 +339,7 @@ func (s *HandlerSuite) TestLogTokenUUID(c *check.C) {
 }
 
 func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
+       c.Assert(s.handler.CheckHealth(), check.IsNil)
        req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
        auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
        c.Assert(err, check.IsNil)
@@ -477,7 +480,7 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) {
        coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
        c.Assert(err, check.IsNil)
        defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
-       db, err := s.handler.db(s.ctx)
+       db, err := s.handler.dbConnector.GetDB(s.ctx)
        c.Assert(err, check.IsNil)
        _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
        c.Assert(err, check.IsNil)
@@ -496,6 +499,35 @@ func (s *HandlerSuite) TestTrashSweep(c *check.C) {
        }
 }
 
+func (s *HandlerSuite) TestContainerLogSweep(c *check.C) {
+       s.cluster.SystemRootToken = arvadostest.SystemRootToken
+       s.cluster.Containers.Logging.SweepInterval = arvados.Duration(time.Second / 10)
+       s.handler.CheckHealth()
+       ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+       logentry, err := s.handler.federation.LogCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+               "object_uuid": arvadostest.CompletedContainerUUID,
+               "event_type":  "stderr",
+               "properties": map[string]interface{}{
+                       "text": "test trash sweep\n",
+               },
+       }})
+       c.Assert(err, check.IsNil)
+       defer s.handler.federation.LogDelete(ctx, arvados.DeleteOptions{UUID: logentry.UUID})
+       deadline := time.Now().Add(5 * time.Second)
+       for {
+               if time.Now().After(deadline) {
+                       c.Log("timed out")
+                       c.FailNow()
+               }
+               logentries, err := s.handler.federation.LogList(ctx, arvados.ListOptions{Filters: []arvados.Filter{{"uuid", "=", logentry.UUID}}, Limit: -1})
+               c.Assert(err, check.IsNil)
+               if len(logentries.Items) == 0 {
+                       break
+               }
+               time.Sleep(time.Second / 10)
+       }
+}
+
 func (s *HandlerSuite) TestLogActivity(c *check.C) {
        s.cluster.SystemRootToken = arvadostest.SystemRootToken
        s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
@@ -521,7 +553,7 @@ func (s *HandlerSuite) TestLogActivity(c *check.C) {
                        c.Assert(err, check.IsNil)
                }
        }
-       db, err := s.handler.db(s.ctx)
+       db, err := s.handler.dbConnector.GetDB(s.ctx)
        c.Assert(err, check.IsNil)
        for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
                var rows int
index 80d5e929850cd18df389daeddb18eb4b12387a38..d4712558eae07c8ddd2a369ae40e8ce4ba55da0c 100644 (file)
@@ -367,6 +367,41 @@ func (rtr *router) addRoutes() {
                                return rtr.backend.LinkDelete(ctx, *opts.(*arvados.DeleteOptions))
                        },
                },
+               {
+                       arvados.EndpointLogCreate,
+                       func() interface{} { return &arvados.CreateOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.LogCreate(ctx, *opts.(*arvados.CreateOptions))
+                       },
+               },
+               {
+                       arvados.EndpointLogUpdate,
+                       func() interface{} { return &arvados.UpdateOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.LogUpdate(ctx, *opts.(*arvados.UpdateOptions))
+                       },
+               },
+               {
+                       arvados.EndpointLogList,
+                       func() interface{} { return &arvados.ListOptions{Limit: -1} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.LogList(ctx, *opts.(*arvados.ListOptions))
+                       },
+               },
+               {
+                       arvados.EndpointLogGet,
+                       func() interface{} { return &arvados.GetOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.LogGet(ctx, *opts.(*arvados.GetOptions))
+                       },
+               },
+               {
+                       arvados.EndpointLogDelete,
+                       func() interface{} { return &arvados.DeleteOptions{} },
+                       func(ctx context.Context, opts interface{}) (interface{}, error) {
+                               return rtr.backend.LogDelete(ctx, *opts.(*arvados.DeleteOptions))
+                       },
+               },
                {
                        arvados.EndpointSpecimenCreate,
                        func() interface{} { return &arvados.CreateOptions{} },
index 0e532f23c070d8b5c64a15bd8bef46494702ae5a..4d8a82ce43ef6b5a3f47d100f509ebd03895c43d 100644 (file)
@@ -559,6 +559,41 @@ func (conn *Conn) LinkDelete(ctx context.Context, options arvados.DeleteOptions)
        return resp, err
 }
 
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+       ep := arvados.EndpointLogCreate
+       var resp arvados.Log
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       return resp, err
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+       ep := arvados.EndpointLogUpdate
+       var resp arvados.Log
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       return resp, err
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+       ep := arvados.EndpointLogGet
+       var resp arvados.Log
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       return resp, err
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+       ep := arvados.EndpointLogList
+       var resp arvados.LogList
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       return resp, err
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+       ep := arvados.EndpointLogDelete
+       var resp arvados.Log
+       err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+       return resp, err
+}
+
 func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
        ep := arvados.EndpointSpecimenCreate
        var resp arvados.Specimen
index 551b2f92bbde209b984656728388ca48a2b9c294..99e7aec0b66c4dbed9462c498ece37e38971157b 100644 (file)
@@ -5,6 +5,7 @@
 package controller
 
 import (
+       "context"
        "time"
 
        "git.arvados.org/arvados.git/lib/controller/dblock"
@@ -12,22 +13,62 @@ import (
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
 )
 
-func (h *Handler) trashSweepWorker() {
-       sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
-       logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
+       logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
        ctx := ctxlog.Context(h.BackgroundContext, logger)
-       if sleep <= 0 {
-               logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+       if interval <= 0 {
+               logger.Debugf("interval is %v, not running worker", interval)
                return
        }
-       dblock.TrashSweep.Lock(ctx, h.db)
-       defer dblock.TrashSweep.Unlock()
-       for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
-               dblock.TrashSweep.Check()
-               ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
-               _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+       if !locker.Lock(ctx, h.dbConnector.GetDB) {
+               // context canceled
+               return
+       }
+       defer locker.Unlock()
+       for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
+               if !locker.Check() {
+                       // context canceled
+                       return
+               }
+               err := run(ctx)
                if err != nil {
-                       logger.WithError(err).Info("trash sweep failed")
+                       logger.WithError(err).Infof("%s failed", workerName)
                }
        }
 }
+
+func (h *Handler) trashSweepWorker() {
+       h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
+               ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+               _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+               return err
+       })
+}
+
+func (h *Handler) containerLogSweepWorker() {
+       h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+               db, err := h.dbConnector.GetDB(ctx)
+               if err != nil {
+                       return err
+               }
+               res, err := db.ExecContext(ctx, `
+DELETE FROM logs
+ USING containers
+ WHERE logs.object_uuid=containers.uuid
+ AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore')
+ AND containers.log IS NOT NULL
+ AND now() - containers.finished_at > $1::interval`,
+                       h.Cluster.Containers.Logging.MaxAge.String())
+               if err != nil {
+                       return err
+               }
+               logger := ctxlog.FromContext(ctx)
+               rows, err := res.RowsAffected()
+               if err != nil {
+                       logger.WithError(err).Warn("unexpected error from RowsAffected()")
+               } else {
+                       logger.WithField("rows", rows).Info("deleted rows from logs table")
+               }
+               return nil
+       })
+}
index ee9115d8d809903be17cbaa10dc4010d1b7d87dc..55790f727a61d289d5b0d5080fa2911ee7789515 100644 (file)
@@ -142,6 +142,7 @@ type ContainerRunner struct {
        parentTemp    string
        costStartTime time.Time
 
+       keepstore        *exec.Cmd
        keepstoreLogger  io.WriteCloser
        keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
@@ -660,6 +661,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
        if err != nil {
                return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
        }
+       if runner.hoststatReporter != nil && runner.ArvMount != nil {
+               runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+       }
 
        for _, p := range collectionPaths {
                _, err = os.Stat(p)
@@ -733,6 +737,7 @@ func (runner *ContainerRunner) startHoststat() error {
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
        return nil
 }
 
@@ -1569,6 +1574,9 @@ func (runner *ContainerRunner) Run() (err error) {
        if err != nil {
                return
        }
+       if runner.keepstore != nil {
+               runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+       }
 
        // set up FUSE mount and binds
        bindmounts, err = runner.SetupMounts()
@@ -1853,6 +1861,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       cr.keepstore = keepstore
        if keepstore == nil {
                // Log explanation (if any) for why we're not running
                // a local keepstore.
index 10cd7cfce43a03472e2e942b68512efcdd7d0c61..3a473cab8715c49eec14d5e0565b61daf9d71a5e 100644 (file)
@@ -13,10 +13,12 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "os"
+       "regexp"
+       "sort"
        "strconv"
        "strings"
+       "sync"
        "syscall"
        "time"
 )
@@ -47,14 +49,20 @@ type Reporter struct {
        TempDir string
 
        // Where to write statistics. Must not be nil.
-       Logger *log.Logger
+       Logger interface {
+               Printf(fmt string, args ...interface{})
+       }
 
+       kernelPageSize      int64
        reportedStatFile    map[string]string
        lastNetSample       map[string]ioSample
        lastDiskIOSample    map[string]ioSample
        lastCPUSample       cpuSample
        lastDiskSpaceSample diskSpaceSample
 
+       reportPIDs   map[string]int
+       reportPIDsMu sync.Mutex
+
        done    chan struct{} // closed when we should stop reporting
        flushed chan struct{} // closed when we have made our last report
 }
@@ -76,6 +84,17 @@ func (r *Reporter) Start() {
        go r.run()
 }
 
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       if r.reportPIDs == nil {
+               r.reportPIDs = map[string]int{name: pid}
+       } else {
+               r.reportPIDs[name] = pid
+       }
+}
+
 // Stop reporting. Do not call more than once, or before calling
 // Start.
 //
@@ -256,6 +275,71 @@ func (r *Reporter) doMemoryStats() {
                }
        }
        r.Logger.Printf("mem%s\n", outstat.String())
+
+       if r.kernelPageSize == 0 {
+               // assign "don't try again" value in case we give up
+               // and return without assigning the real value
+               r.kernelPageSize = -1
+               buf, err := os.ReadFile("/proc/self/smaps")
+               if err != nil {
+                       r.Logger.Printf("error reading /proc/self/smaps: %s", err)
+                       return
+               }
+               m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+               if len(m) != 2 {
+                       r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
+                       return
+               }
+               size, err := strconv.ParseInt(string(m[1]), 10, 64)
+               if err != nil {
+                       r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
+                       return
+               }
+               r.kernelPageSize = size * 1024
+       } else if r.kernelPageSize < 0 {
+               // already failed to determine page size, don't keep
+               // trying/logging
+               return
+       }
+
+       r.reportPIDsMu.Lock()
+       defer r.reportPIDsMu.Unlock()
+       procnames := make([]string, 0, len(r.reportPIDs))
+       for name := range r.reportPIDs {
+               procnames = append(procnames, name)
+       }
+       sort.Strings(procnames)
+       procmem := ""
+       for _, procname := range procnames {
+               pid := r.reportPIDs[procname]
+               buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+               if err != nil {
+                       continue
+               }
+               // If the executable name contains a ')' char,
+               // /proc/$pid/stat will look like '1234 (exec name)) S
+               // 123 ...' -- the last ')' is the end of the 2nd
+               // field.
+               paren := bytes.LastIndexByte(buf, ')')
+               if paren < 0 {
+                       continue
+               }
+               fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+               if len(fields) < 24 {
+                       continue
+               }
+               // rss is the 24th field in .../stat, and fields[0]
+               // here is the last char ')' of the 2nd field, so
+               // rss is fields[22]
+               rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
+               if err != nil {
+                       continue
+               }
+               procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+       }
+       if procmem != "" {
+               r.Logger.Printf("procmem%s\n", procmem)
+       }
 }
 
 func (r *Reporter) doNetworkStats() {
index c27e39241df08af2c925a791e6fd849afc496b90..5e8e93de6cfae9ce3f51c7b191e515ff8e7d9955 100644 (file)
@@ -5,62 +5,81 @@
 package crunchstat
 
 import (
-       "bufio"
-       "io"
+       "bytes"
        "log"
        "os"
        "regexp"
+       "strconv"
        "testing"
+       "time"
+
+       "github.com/sirupsen/logrus"
+       . "gopkg.in/check.v1"
 )
 
-func bufLogger() (*log.Logger, *bufio.Reader) {
-       r, w := io.Pipe()
-       logger := log.New(w, "", 0)
-       return logger, bufio.NewReader(r)
+func Test(t *testing.T) {
+       TestingT(t)
 }
 
-func TestReadAllOrWarnFail(t *testing.T) {
-       logger, rcv := bufLogger()
-       rep := Reporter{Logger: logger}
+var _ = Suite(&suite{})
 
-       done := make(chan bool)
-       var msg []byte
-       var err error
-       go func() {
-               msg, err = rcv.ReadBytes('\n')
-               close(done)
-       }()
-       {
-               // The special file /proc/self/mem can be opened for
-               // reading, but reading from byte 0 returns an error.
-               f, err := os.Open("/proc/self/mem")
-               if err != nil {
-                       t.Fatalf("Opening /proc/self/mem: %s", err)
-               }
-               if x, err := rep.readAllOrWarn(f); err == nil {
-                       t.Fatalf("Expected error, got %v", x)
-               }
-       }
-       <-done
-       if err != nil {
-               t.Fatal(err)
-       } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
-               t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
-       }
+type suite struct{}
+
+func (s *suite) TestReadAllOrWarnFail(c *C) {
+       var logger bytes.Buffer
+       rep := Reporter{Logger: log.New(&logger, "", 0)}
+
+       // The special file /proc/self/mem can be opened for
+       // reading, but reading from byte 0 returns an error.
+       f, err := os.Open("/proc/self/mem")
+       c.Assert(err, IsNil)
+       defer f.Close()
+       _, err = rep.readAllOrWarn(f)
+       c.Check(err, NotNil)
+       c.Check(logger.String(), Matches, "^warning: read /proc/self/mem: .*\n")
 }
 
-func TestReadAllOrWarnSuccess(t *testing.T) {
-       rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
+       var logbuf bytes.Buffer
+       rep := Reporter{Logger: log.New(&logbuf, "", 0)}
 
        f, err := os.Open("./crunchstat_test.go")
-       if err != nil {
-               t.Fatalf("Opening ./crunchstat_test.go: %s", err)
-       }
+       c.Assert(err, IsNil)
+       defer f.Close()
        data, err := rep.readAllOrWarn(f)
-       if err != nil {
-               t.Fatalf("got error %s", err)
+       c.Check(err, IsNil)
+       c.Check(string(data), Matches, "(?ms).*\npackage crunchstat\n.*")
+       c.Check(logbuf.String(), Equals, "")
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+       var logbuf bytes.Buffer
+       logger := logrus.New()
+       logger.Out = &logbuf
+       r := Reporter{
+               Logger:     logger,
+               CgroupRoot: "/sys/fs/cgroup",
+               PollPeriod: time.Second,
        }
-       if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
-               t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+       r.Start()
+       r.ReportPID("init", 1)
+       r.ReportPID("test_process", os.Getpid())
+       r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+       for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+               if time.Now().After(deadline) {
+                       c.Error("timed out")
+                       break
+               }
+               if m := regexp.MustCompile(`(?ms).*procmem \d+ init (\d+) test_process.*`).FindSubmatch(logbuf.Bytes()); len(m) > 0 {
+                       size, err := strconv.ParseInt(string(m[1]), 10, 64)
+                       c.Check(err, IsNil)
+                       // Expect >1 MiB and <100 MiB -- otherwise we
+                       // are probably misinterpreting /proc/N/stat
+                       // or multiplying by the wrong page size.
+                       c.Check(size > 1000000, Equals, true)
+                       c.Check(size < 100000000, Equals, true)
+                       break
+               }
        }
+       c.Logf("%s", logbuf.String())
 }
index a76420860604b9a6fb9823bdc6b3775c70f85ff4..2a05096ce18b7430e7e1e487dd5d710024ac9193 100644 (file)
@@ -10,6 +10,7 @@ import (
        "sync"
 
        "git.arvados.org/arvados.git/lib/controller/api"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/jmoiron/sqlx"
 
@@ -142,3 +143,33 @@ func CurrentTx(ctx context.Context) (*sqlx.Tx, error) {
        })
        return txn.tx, txn.err
 }
+
+var errDBConnection = errors.New("database connection error")
+
+type DBConnector struct {
+       PostgreSQL arvados.PostgreSQL
+       pgdb       *sqlx.DB
+       mtx        sync.Mutex
+}
+
+func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) {
+       dbc.mtx.Lock()
+       defer dbc.mtx.Unlock()
+       if dbc.pgdb != nil {
+               return dbc.pgdb, nil
+       }
+       db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String())
+       if err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
+               return nil, errDBConnection
+       }
+       if p := dbc.PostgreSQL.ConnectionPool; p > 0 {
+               db.SetMaxOpenConns(p)
+       }
+       if err := db.Ping(); err != nil {
+               ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
+               return nil, errDBConnection
+       }
+       dbc.pgdb = db
+       return db, nil
+}
index 3a2ebe0c280bf68f6e7e397e65489c70196f91ae..9c229c9b4e1f4d3538cb59a06e5228a73ddc00db 100644 (file)
@@ -38,6 +38,7 @@ func (Command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        f.StringVar(&diag.dockerImage, "docker-image", "", "image to use when running a test container (default: use embedded hello-world image)")
        f.BoolVar(&diag.checkInternal, "internal-client", false, "check that this host is considered an \"internal\" client")
        f.BoolVar(&diag.checkExternal, "external-client", false, "check that this host is considered an \"external\" client")
+       f.BoolVar(&diag.verbose, "v", false, "verbose: include more information in report")
        f.IntVar(&diag.priority, "priority", 500, "priority for test container (1..1000, or 0 to skip)")
        f.DurationVar(&diag.timeout, "timeout", 10*time.Second, "timeout for http requests")
        if ok, code := cmd.ParseFlags(f, prog, args, "", stderr); !ok {
@@ -61,6 +62,7 @@ func (Command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 }
 
 // docker save hello-world > hello-world.tar
+//
 //go:embed hello-world.tar
 var HelloWorldDockerImage []byte
 
@@ -73,6 +75,7 @@ type diagnoser struct {
        dockerImage   string
        checkInternal bool
        checkExternal bool
+       verbose       bool
        timeout       time.Duration
        logger        *logrus.Logger
        errors        []string
@@ -87,6 +90,12 @@ func (diag *diagnoser) infof(f string, args ...interface{}) {
        diag.logger.Infof("  ... "+f, args...)
 }
 
+func (diag *diagnoser) verbosef(f string, args ...interface{}) {
+       if diag.verbose {
+               diag.logger.Infof("  ... "+f, args...)
+       }
+}
+
 func (diag *diagnoser) warnf(f string, args ...interface{}) {
        diag.logger.Warnf("  ... "+f, args...)
 }
@@ -128,6 +137,13 @@ func (diag *diagnoser) runtests() {
                return
        }
 
+       hostname, err := os.Hostname()
+       if err != nil {
+               diag.warnf("error getting hostname: %s")
+       } else {
+               diag.verbosef("hostname = %s", hostname)
+       }
+
        diag.dotest(5, "running health check (same as `arvados-server check`)", func() error {
                ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(&bytes.Buffer{}, "text", "info"))
                ldr.SetupFlags(flag.NewFlagSet("diagnostics", flag.ContinueOnError))
@@ -141,14 +157,39 @@ func (diag *diagnoser) runtests() {
                        return err
                }
                if cluster.SystemRootToken != os.Getenv("ARVADOS_API_TOKEN") {
-                       diag.infof("skipping because provided token is not SystemRootToken")
+                       return fmt.Errorf("diagnostics usage error: %s is readable but SystemRootToken does not match $ARVADOS_API_TOKEN (to fix, either run 'arvados-client sudo diagnostics' to load everything from config file, or set ARVADOS_CONFIG=- to load nothing from config file)", ldr.Path)
                }
                agg := &health.Aggregator{Cluster: cluster}
                resp := agg.ClusterHealth()
                for _, e := range resp.Errors {
                        diag.errorf("health check: %s", e)
                }
-               diag.infof("health check: reported clock skew %v", resp.ClockSkew)
+               if len(resp.Errors) > 0 {
+                       diag.infof("consider running `arvados-server check -yaml` for a comprehensive report")
+               }
+               diag.verbosef("reported clock skew = %v", resp.ClockSkew)
+               reported := map[string]bool{}
+               for _, result := range resp.Checks {
+                       version := strings.SplitN(result.Metrics.Version, " (go", 2)[0]
+                       if version != "" && !reported[version] {
+                               diag.verbosef("arvados version = %s", version)
+                               reported[version] = true
+                       }
+               }
+               reported = map[string]bool{}
+               for _, result := range resp.Checks {
+                       if result.Server != "" && !reported[result.Server] {
+                               diag.verbosef("http frontend version = %s", result.Server)
+                               reported[result.Server] = true
+                       }
+               }
+               reported = map[string]bool{}
+               for _, result := range resp.Checks {
+                       if sha := result.ConfigSourceSHA256; sha != "" && !reported[sha] {
+                               diag.verbosef("config file sha256 = %s", sha)
+                               reported[sha] = true
+                       }
+               }
                return nil
        })
 
@@ -161,7 +202,7 @@ func (diag *diagnoser) runtests() {
                if err != nil {
                        return err
                }
-               diag.debugf("BlobSignatureTTL = %d", dd.BlobSignatureTTL)
+               diag.verbosef("BlobSignatureTTL = %d", dd.BlobSignatureTTL)
                return nil
        })
 
@@ -175,7 +216,7 @@ func (diag *diagnoser) runtests() {
                if err != nil {
                        return err
                }
-               diag.debugf("Collections.BlobSigning = %v", cluster.Collections.BlobSigning)
+               diag.verbosef("Collections.BlobSigning = %v", cluster.Collections.BlobSigning)
                cfgOK = true
                return nil
        })
@@ -188,7 +229,7 @@ func (diag *diagnoser) runtests() {
                if err != nil {
                        return err
                }
-               diag.debugf("user uuid = %s", user.UUID)
+               diag.verbosef("user uuid = %s", user.UUID)
                return nil
        })
 
@@ -277,9 +318,9 @@ func (diag *diagnoser) runtests() {
                isInternal := found["proxy"] == 0 && len(keeplist.Items) > 0
                isExternal := found["proxy"] > 0 && found["proxy"] == len(keeplist.Items)
                if isExternal {
-                       diag.debugf("controller returned only proxy services, this host is treated as \"external\"")
+                       diag.verbosef("controller returned only proxy services, this host is treated as \"external\"")
                } else if isInternal {
-                       diag.debugf("controller returned only non-proxy services, this host is treated as \"internal\"")
+                       diag.verbosef("controller returned only non-proxy services, this host is treated as \"internal\"")
                }
                if (diag.checkInternal && !isInternal) || (diag.checkExternal && !isExternal) {
                        return fmt.Errorf("expecting internal=%v external=%v, but found internal=%v external=%v", diag.checkInternal, diag.checkExternal, isInternal, isExternal)
@@ -356,7 +397,7 @@ func (diag *diagnoser) runtests() {
                }
                if len(grplist.Items) > 0 {
                        project = grplist.Items[0]
-                       diag.debugf("using existing project, uuid = %s", project.UUID)
+                       diag.verbosef("using existing project, uuid = %s", project.UUID)
                        return nil
                }
                diag.debugf("list groups: ok, no results")
@@ -367,7 +408,7 @@ func (diag *diagnoser) runtests() {
                if err != nil {
                        return fmt.Errorf("create project: %s", err)
                }
-               diag.debugf("created project, uuid = %s", project.UUID)
+               diag.verbosef("created project, uuid = %s", project.UUID)
                return nil
        })
 
@@ -387,7 +428,7 @@ func (diag *diagnoser) runtests() {
                if err != nil {
                        return err
                }
-               diag.debugf("ok, uuid = %s", collection.UUID)
+               diag.verbosef("ok, uuid = %s", collection.UUID)
                return nil
        })
 
@@ -657,8 +698,8 @@ func (diag *diagnoser) runtests() {
                if err != nil {
                        return err
                }
-               diag.debugf("container request uuid = %s", cr.UUID)
-               diag.debugf("container uuid = %s", cr.ContainerUUID)
+               diag.verbosef("container request uuid = %s", cr.UUID)
+               diag.verbosef("container uuid = %s", cr.ContainerUUID)
 
                timeout := 10 * time.Minute
                diag.infof("container request submitted, waiting up to %v for container to run", arvados.Duration(timeout))
index ae91a710e395295f47a34cb5645f980021e79021..3403c50c972987e7f6f21a927a6db592fac9f6fc 100644 (file)
@@ -15,6 +15,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
        "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
        "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
@@ -53,6 +55,7 @@ type dispatcher struct {
        Registry      *prometheus.Registry
        InstanceSetID cloud.InstanceSetID
 
+       dbConnector ctrlctx.DBConnector
        logger      logrus.FieldLogger
        instanceSet cloud.InstanceSet
        pool        pool
@@ -118,6 +121,7 @@ func (disp *dispatcher) setup() {
 
 func (disp *dispatcher) initialize() {
        disp.logger = ctxlog.FromContext(disp.Context)
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
 
        disp.ArvClient.AuthToken = disp.AuthToken
 
@@ -143,6 +147,7 @@ func (disp *dispatcher) initialize() {
        if err != nil {
                disp.logger.Fatalf("error initializing driver: %s", err)
        }
+       dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
@@ -175,6 +180,7 @@ func (disp *dispatcher) initialize() {
 }
 
 func (disp *dispatcher) run() {
+       defer dblock.Dispatch.Unlock()
        defer close(disp.stopped)
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
index 829a053636d5dc07abaac1c649810c5416e09fb6..2d486da5fd5a9d4aafbbc0b82f06d0c20c7f91e8 100644 (file)
@@ -15,6 +15,7 @@ import (
        "sync"
        "time"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -49,8 +50,16 @@ func (s *DispatcherSuite) SetUpTest(c *check.C) {
                MinTimeBetweenCreateCalls: time.Millisecond,
        }
 
+       // We need the postgresql connection info from the integration
+       // test config.
+       cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load()
+       c.Assert(err, check.IsNil)
+       testcluster, err := cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+
        s.cluster = &arvados.Cluster{
                ManagementToken: "test-management-token",
+               PostgreSQL:      testcluster.PostgreSQL,
                Containers: arvados.ContainersConfig{
                        CrunchRunCommand:       "crunch-run",
                        CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
@@ -184,12 +193,18 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
        err := s.disp.CheckHealth()
        c.Check(err, check.IsNil)
 
-       select {
-       case <-done:
-               c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
-       case <-time.After(10 * time.Second):
-               c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+       for len(waiting) > 0 {
+               waswaiting := len(waiting)
+               select {
+               case <-done:
+                       // loop will end because len(waiting)==0
+               case <-time.After(3 * time.Second):
+                       if len(waiting) >= waswaiting {
+                               c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+                       }
+               }
        }
+       c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
 
        deadline := time.Now().Add(5 * time.Second)
        for range time.NewTicker(10 * time.Millisecond).C {
index d362f66d14b3ee12b9a4fb6b197b9a34747d944c..d1408d23cb1a4e3c2274f40d2f02b66bda29e82d 100644 (file)
@@ -18,6 +18,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -58,6 +60,7 @@ type dispatcher struct {
        Registry  *prometheus.Registry
 
        logger        logrus.FieldLogger
+       dbConnector   ctrlctx.DBConnector
        lsfcli        lsfcli
        lsfqueue      lsfqueue
        arvDispatcher *dispatch.Dispatcher
@@ -73,7 +76,9 @@ type dispatcher struct {
 func (disp *dispatcher) Start() {
        disp.initOnce.Do(func() {
                disp.init()
+               dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
                go func() {
+                       defer dblock.Dispatch.Unlock()
                        disp.checkLsfQueueForOrphans()
                        err := disp.arvDispatcher.Run(disp.Context)
                        if err != nil {
@@ -125,6 +130,7 @@ func (disp *dispatcher) init() {
                lsfcli: &disp.lsfcli,
        }
        disp.ArvClient.AuthToken = disp.AuthToken
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
        disp.stop = make(chan struct{}, 1)
        disp.stopped = make(chan struct{})
 
index e9b58bc83b2fb4b655676acab301a6528f170a77..66fe143e0f42d169d29ec04e85ddd50a6702f1c9 100644 (file)
@@ -91,6 +91,8 @@ class ArvadosContainer(JobBase):
         container_request["state"] = "Committed"
         container_request.setdefault("properties", {})
 
+        container_request["properties"]["cwl_input"] = self.joborder
+
         runtime_constraints = {}
 
         if runtimeContext.project_uuid:
@@ -437,6 +439,13 @@ class ArvadosContainer(JobBase):
 
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+
+            properties = record["properties"].copy()
+            properties["cwl_output"] = outputs
+            self.arvrunner.api.container_requests().update(
+                uuid=self.uuid,
+                body={"container_request": {"properties": properties}}
+            ).execute(num_retries=self.arvrunner.num_retries)
         except WorkflowException as e:
             # Only include a stack trace if in debug mode.
             # A stack trace may obfuscate more useful output about the workflow.
index cb57b446da5877b4e74ef0b30ae7f930e37b0c12..ae3eab6ed822f88685f1fb2743bb9e31d4d436f8 100644 (file)
@@ -186,7 +186,7 @@ class TestContainer(unittest.TestCase):
                         'command': ['ls', '/var/spool/cwl'],
                         'cwd': '/var/spool/cwl',
                         'scheduling_parameters': {},
-                        'properties': {},
+                        'properties': {'cwl_input': {}},
                         'secret_mounts': {},
                         'output_storage_classes': ["default"]
                     }))
@@ -277,7 +277,7 @@ class TestContainer(unittest.TestCase):
             'scheduling_parameters': {
                 'partitions': ['blurb']
             },
-            'properties': {},
+            'properties': {'cwl_input': {}},
             'secret_mounts': {},
             'output_storage_classes': ["default"]
         }
@@ -410,7 +410,7 @@ class TestContainer(unittest.TestCase):
             'cwd': '/var/spool/cwl',
             'scheduling_parameters': {
             },
-            'properties': {},
+            'properties': {'cwl_input': {}},
             'secret_mounts': {},
             'output_storage_classes': ["default"]
         }
@@ -497,7 +497,7 @@ class TestContainer(unittest.TestCase):
                     'command': ['ls', '/var/spool/cwl'],
                     'cwd': '/var/spool/cwl',
                     'scheduling_parameters': {},
-                    'properties': {},
+                    'properties': {'cwl_input': {}},
                     'secret_mounts': {},
                     'output_storage_classes': ["default"]
                 }))
@@ -534,6 +534,7 @@ class TestContainer(unittest.TestCase):
         arvjob.successCodes = [0]
         arvjob.outdir = "/var/spool/cwl"
         arvjob.output_ttl = 3600
+        arvjob.uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzz1"
 
         arvjob.collect_outputs.return_value = {"out": "stuff"}
 
@@ -543,7 +544,8 @@ class TestContainer(unittest.TestCase):
             "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
             "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
             "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
-            "modified_at": "2017-05-26T12:01:22Z"
+            "modified_at": "2017-05-26T12:01:22Z",
+            "properties": {}
         })
 
         self.assertFalse(api.collections().create.called)
@@ -553,6 +555,10 @@ class TestContainer(unittest.TestCase):
         arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
         runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
 
+        runner.api.container_requests().update.assert_called_with(uuid="zzzzz-xvhdp-zzzzzzzzzzzzzz1",
+                                                                  body={'container_request': {'properties': {'cwl_output': {'out': 'stuff'}}}})
+
+
     # Test to make sure we dont call runtime_status_update if we already did
     # some where higher up in the call stack
     @mock.patch("arvados_cwl.util.get_current_container")
@@ -636,7 +642,8 @@ class TestContainer(unittest.TestCase):
             "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
             "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
             "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
-            "modified_at": "2017-05-26T12:01:22Z"
+            "modified_at": "2017-05-26T12:01:22Z",
+            "properties": {}
         })
 
         rts_mock.assert_called_with(
@@ -733,7 +740,38 @@ class TestContainer(unittest.TestCase):
                     'command': ['ls', '/var/spool/cwl'],
                     'cwd': '/var/spool/cwl',
                     'scheduling_parameters': {},
-                    'properties': {},
+                    'properties': {'cwl_input': {
+                        "p1": {
+                            "basename": "99999999999999999999999999999994+44",
+                            "class": "Directory",
+                            "dirname": "/keep",
+                            "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+                            "listing": [
+                                {
+                                    "basename": "file1",
+                                    "class": "File",
+                                    "dirname": "/keep/99999999999999999999999999999994+44",
+                                    "location": "keep:99999999999999999999999999999994+44/file1",
+                                    "nameext": "",
+                                    "nameroot": "file1",
+                                    "path": "/keep/99999999999999999999999999999994+44/file1",
+                                    "size": 0
+                                },
+                                {
+                                    "basename": "file2",
+                                    "class": "File",
+                                    "dirname": "/keep/99999999999999999999999999999994+44",
+                                    "location": "keep:99999999999999999999999999999994+44/file2",
+                                    "nameext": "",
+                                    "nameroot": "file2",
+                                    "path": "/keep/99999999999999999999999999999994+44/file2",
+                                    "size": 0
+                                }
+                            ],
+                            "location": "keep:99999999999999999999999999999994+44",
+                            "path": "/keep/99999999999999999999999999999994+44"
+                        }
+                    }},
                     'secret_mounts': {},
                     'output_storage_classes': ["default"]
                 }))
@@ -827,7 +865,7 @@ class TestContainer(unittest.TestCase):
                     'command': ['md5sum', 'example.conf'],
                     'cwd': '/var/spool/cwl',
                     'scheduling_parameters': {},
-                    'properties': {},
+                    'properties': {'cwl_input': job_order},
                     "secret_mounts": {
                         "/var/spool/cwl/example.conf": {
                             "content": "username: user\npassword: blorp\n",
@@ -949,7 +987,7 @@ class TestContainer(unittest.TestCase):
                     'command': ['ls', '/var/spool/cwl'],
                     'cwd': '/var/spool/cwl',
                     'scheduling_parameters': {},
-                    'properties': {},
+                    'properties': {'cwl_input': {}},
                     'secret_mounts': {},
                     'output_storage_classes': ["foo_sc", "bar_sc"]
                 }))
@@ -1037,6 +1075,7 @@ class TestContainer(unittest.TestCase):
                     'scheduling_parameters': {},
                     'properties': {
                         "baz": "blorp",
+                        "cwl_input": {"x": "blorp"},
                         "foo": "bar",
                         "quux": {
                             "q1": 1,
@@ -1145,7 +1184,7 @@ class TestContainer(unittest.TestCase):
                         'command': ['nvidia-smi'],
                         'cwd': '/var/spool/cwl',
                         'scheduling_parameters': {},
-                        'properties': {},
+                        'properties': {'cwl_input': {}},
                         'secret_mounts': {},
                         'output_storage_classes': ["default"]
                     }))
@@ -1219,7 +1258,7 @@ class TestContainer(unittest.TestCase):
             'command': ['echo'],
             'cwd': '/var/spool/cwl',
             'scheduling_parameters': {},
-            'properties': {},
+            'properties': {'cwl_input': {}},
             'secret_mounts': {},
             'output_storage_classes': ["default"]
         }
@@ -1332,7 +1371,7 @@ class TestContainer(unittest.TestCase):
                             'command': ['ls', '/var/spool/cwl'],
                             'cwd': '/var/spool/cwl',
                             'scheduling_parameters': sched,
-                            'properties': {},
+                            'properties': {'cwl_input': {}},
                             'secret_mounts': {},
                             'output_storage_classes': ["default"]
                         }))
@@ -1521,7 +1560,19 @@ class TestWorkflow(unittest.TestCase):
                 "output_path": "/var/spool/cwl",
                 "output_ttl": 0,
                 "priority": 500,
-                "properties": {},
+                "properties": {'cwl_input': {
+                        "fileblub": {
+                            "basename": "token.txt",
+                            "class": "File",
+                            "dirname": "/keep/99999999999999999999999999999999+118",
+                            "location": "keep:99999999999999999999999999999999+118/token.txt",
+                            "nameext": ".txt",
+                            "nameroot": "token",
+                            "path": "/keep/99999999999999999999999999999999+118/token.txt",
+                            "size": 0
+                        },
+                        "sleeptime": 5
+                }},
                 "runtime_constraints": {
                     "ram": 1073741824,
                     "vcpus": 1
@@ -1594,7 +1645,7 @@ class TestWorkflow(unittest.TestCase):
                 'name': u'echo-subwf',
                 'secret_mounts': {},
                 'runtime_constraints': {'API': True, 'vcpus': 3, 'ram': 1073741824},
-                'properties': {},
+                'properties': {'cwl_input': {}},
                 'priority': 500,
                 'mounts': {
                     '/var/spool/cwl/cwl.input.yml': {
index 3797a17f50d504ae2894ac4c6a68f598b4e37564..bec387e85737f9d745c81bc6f1fbc5dae54f27cf 100644 (file)
@@ -70,6 +70,11 @@ var (
        EndpointLinkGet                       = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
        EndpointLinkList                      = APIEndpoint{"GET", "arvados/v1/links", ""}
        EndpointLinkDelete                    = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+       EndpointLogCreate                     = APIEndpoint{"POST", "arvados/v1/logs", "log"}
+       EndpointLogUpdate                     = APIEndpoint{"PATCH", "arvados/v1/logs/{uuid}", "log"}
+       EndpointLogGet                        = APIEndpoint{"GET", "arvados/v1/logs/{uuid}", ""}
+       EndpointLogList                       = APIEndpoint{"GET", "arvados/v1/logs", ""}
+       EndpointLogDelete                     = APIEndpoint{"DELETE", "arvados/v1/logs/{uuid}", ""}
        EndpointSysTrashSweep                 = APIEndpoint{"POST", "sys/trash_sweep", ""}
        EndpointUserActivate                  = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
        EndpointUserCreate                    = APIEndpoint{"POST", "arvados/v1/users", "user"}
@@ -284,6 +289,11 @@ type API interface {
        LinkGet(ctx context.Context, options GetOptions) (Link, error)
        LinkList(ctx context.Context, options ListOptions) (LinkList, error)
        LinkDelete(ctx context.Context, options DeleteOptions) (Link, error)
+       LogCreate(ctx context.Context, options CreateOptions) (Log, error)
+       LogUpdate(ctx context.Context, options UpdateOptions) (Log, error)
+       LogGet(ctx context.Context, options GetOptions) (Log, error)
+       LogList(ctx context.Context, options ListOptions) (LogList, error)
+       LogDelete(ctx context.Context, options DeleteOptions) (Log, error)
        SpecimenCreate(ctx context.Context, options CreateOptions) (Specimen, error)
        SpecimenUpdate(ctx context.Context, options UpdateOptions) (Specimen, error)
        SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
index 4dead0ada9143231a1b34c1700174279e64cfe83..4d140517e53687e7d62f9d211a1c566825a631f4 100644 (file)
@@ -153,10 +153,10 @@ func NewClientFromConfig(cluster *Cluster) (*Client, error) {
 // Space characters are trimmed when reading the settings file, so
 // these are equivalent:
 //
-//   ARVADOS_API_HOST=localhost\n
-//   ARVADOS_API_HOST=localhost\r\n
-//   ARVADOS_API_HOST = localhost \n
-//   \tARVADOS_API_HOST = localhost\n
+//     ARVADOS_API_HOST=localhost\n
+//     ARVADOS_API_HOST=localhost\r\n
+//     ARVADOS_API_HOST = localhost \n
+//     \tARVADOS_API_HOST = localhost\n
 func NewClientFromEnv() *Client {
        vars := map[string]string{}
        home := os.Getenv("HOME")
@@ -330,11 +330,11 @@ func (c *Client) DoAndDecode(dst interface{}, req *http.Request) error {
 
 // Convert an arbitrary struct to url.Values. For example,
 //
-//     Foo{Bar: []int{1,2,3}, Baz: "waz"}
+//     Foo{Bar: []int{1,2,3}, Baz: "waz"}
 //
 // becomes
 //
-//     url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+//     url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
 //
 // params itself is returned if it is already an url.Values.
 func anythingToValues(params interface{}) (url.Values, error) {
index a1fc2e89f44331f005e2b28609a32dffe7c5480a..64b7fab8d2267b4f9d4f96cbdad15954965c58db 100644 (file)
@@ -291,6 +291,7 @@ type Cluster struct {
                SSHHelpPageHTML        string
                SSHHelpHostSuffix      string
                IdleTimeout            Duration
+               BannerURL              string
        }
 }
 
@@ -466,6 +467,7 @@ type ContainersConfig struct {
        }
        Logging struct {
                MaxAge                       Duration
+               SweepInterval                Duration
                LogBytesPerEvent             int
                LogSecondsBetweenEvents      Duration
                LogThrottlePeriod            Duration
@@ -533,9 +535,11 @@ type InstanceTypeMap map[string]InstanceType
 var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
 
 // UnmarshalJSON does special handling of InstanceTypes:
-// * populate computed fields (Name and Scratch)
-// * error out if InstancesTypes are populated as an array, which was
-//   deprecated in Arvados 1.2.0
+//
+// - populate computed fields (Name and Scratch)
+//
+// - error out if InstancesTypes are populated as an array, which was
+// deprecated in Arvados 1.2.0
 func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
        fixup := func(t InstanceType) (InstanceType, error) {
                if t.ProviderType == "" {
index c922f0a30dd49abd0f11b29f94d2ced6a8ea09cb..9df210ccb016ef85327b9eaf09ca3aacec0ae9f2 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "bytes"
        "encoding/json"
        "fmt"
        "strings"
@@ -17,6 +18,13 @@ type Duration time.Duration
 
 // UnmarshalJSON implements json.Unmarshaler.
 func (d *Duration) UnmarshalJSON(data []byte) error {
+       if bytes.Equal(data, []byte(`"0"`)) || bytes.Equal(data, []byte(`0`)) {
+               // Unitless 0 is not accepted by ParseDuration, but we
+               // accept it as a reasonable spelling of 0
+               // nanoseconds.
+               *d = 0
+               return nil
+       }
        if data[0] == '"' {
                return d.Set(string(data[1 : len(data)-1]))
        }
index 6a198e69400201f803566b4e19022158956c50be..40344d061b0682327ded9ab016a8865410a923d5 100644 (file)
@@ -60,4 +60,14 @@ func (s *DurationSuite) TestUnmarshalJSON(c *check.C) {
        err = json.Unmarshal([]byte(`{"D":"60s"}`), &d)
        c.Check(err, check.IsNil)
        c.Check(d.D.Duration(), check.Equals, time.Minute)
+
+       d.D = Duration(time.Second)
+       err = json.Unmarshal([]byte(`{"D":"0"}`), &d)
+       c.Check(err, check.IsNil)
+       c.Check(d.D.Duration(), check.Equals, time.Duration(0))
+
+       d.D = Duration(time.Second)
+       err = json.Unmarshal([]byte(`{"D":0}`), &d)
+       c.Check(err, check.IsNil)
+       c.Check(d.D.Duration(), check.Equals, time.Duration(0))
 }
index a26c876b932304ab6fdfefbafe36145665cbac90..354658a257dba00d54f9d98f9f7c1328e84b7ae2 100644 (file)
@@ -513,9 +513,9 @@ type filenodePtr struct {
 //
 // After seeking:
 //
-//     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
-//     ||
-//     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+//     ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+//     ||
+//     filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
 func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
        ptr = startPtr
        if ptr.off < 0 {
index 6f72634e5457e7379ee6660297be9aced63b91a0..06d7987e321299af7577084c043c0e56b5c664da 100644 (file)
@@ -12,12 +12,15 @@ import (
 type Log struct {
        ID              uint64                 `json:"id"`
        UUID            string                 `json:"uuid"`
+       OwnerUUID       string                 `json:"owner_uuid"`
        ObjectUUID      string                 `json:"object_uuid"`
        ObjectOwnerUUID string                 `json:"object_owner_uuid"`
        EventType       string                 `json:"event_type"`
-       EventAt         *time.Time             `json:"event"`
+       EventAt         time.Time              `json:"event"`
+       Summary         string                 `json:"summary"`
        Properties      map[string]interface{} `json:"properties"`
-       CreatedAt       *time.Time             `json:"created_at"`
+       CreatedAt       time.Time              `json:"created_at"`
+       ModifiedAt      time.Time              `json:"modified_at"`
 }
 
 // LogList is an arvados#logList resource.
index bb1bec789f7f459a3cd49657c4df5337711cce19..bf60a770267e437f7551bfb59fc60e62920fea9c 100644 (file)
@@ -37,6 +37,8 @@ func (v *Vocabulary) systemTagKeys() map[string]bool {
                "docker-image-repo-tag": true,
                "filters":               true,
                "container_request":     true,
+               "cwl_input":             true,
+               "cwl_output":            true,
        }
 }
 
index 84b9bf2295e62e6025e0c6f03847c4d3e666a9eb..f31a4f984b36f7c70aa9987017d9596900c91173 100644 (file)
@@ -238,6 +238,8 @@ func (s *VocabularySuite) TestNewVocabulary(c *check.C) {
                                        "docker-image-repo-tag": true,
                                        "filters":               true,
                                        "container_request":     true,
+                                       "cwl_input":             true,
+                                       "cwl_output":            true,
                                },
                                StrictTags: false,
                                Tags: map[string]VocabularyTag{
index d6da579d6b9ce1323dfbeb9b50f993232822379a..83efd889286d63bc9efd8fd4368850a2870a9d15 100644 (file)
@@ -193,6 +193,26 @@ func (as *APIStub) LinkDelete(ctx context.Context, options arvados.DeleteOptions
        as.appendCall(ctx, as.LinkDelete, options)
        return arvados.Link{}, as.Error
 }
+func (as *APIStub) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+       as.appendCall(ctx, as.LogCreate, options)
+       return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+       as.appendCall(ctx, as.LogUpdate, options)
+       return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+       as.appendCall(ctx, as.LogGet, options)
+       return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+       as.appendCall(ctx, as.LogList, options)
+       return arvados.LogList{}, as.Error
+}
+func (as *APIStub) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+       as.appendCall(ctx, as.LogDelete, options)
+       return arvados.Log{}, as.Error
+}
 func (as *APIStub) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
        as.appendCall(ctx, as.SpecimenCreate, options)
        return arvados.Specimen{}, as.Error
index caf99108a632ac44163a4e669fce9bb00366c078..6fb33dc608054611c92c73ab41c904bada7ebb6d 100644 (file)
@@ -135,6 +135,7 @@ type CheckResult struct {
        Response       map[string]interface{} `json:",omitempty"`
        ResponseTime   json.Number
        ClockTime      time.Time
+       Server         string // "Server" header in http response
        Metrics
        respTime time.Duration
 }
@@ -360,6 +361,7 @@ func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
        }
        result.Health = "OK"
        result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
+       result.Server = resp.Header.Get("Server")
        return
 }
 
@@ -438,7 +440,7 @@ func (ccmd checkCommand) RunCommand(prog string, args []string, stdin io.Reader,
        err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
        if err != nil {
                if err != errSilent {
-                       fmt.Fprintln(stdout, err.Error())
+                       fmt.Fprintln(stderr, err.Error())
                }
                return 1
        }
@@ -452,6 +454,7 @@ func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, st
        loader.SetupFlags(flags)
        versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
        timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
+       quiet := flags.Bool("quiet", false, "Silent on success (suppress 'health check OK' message on stderr)")
        outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
        if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
                // cmd.ParseFlags already reported the error
@@ -487,11 +490,14 @@ func (ccmd checkCommand) run(ctx context.Context, prog string, args []string, st
        }
        if resp.Health != "OK" {
                for _, msg := range resp.Errors {
-                       fmt.Fprintln(stdout, msg)
+                       fmt.Fprintln(stderr, msg)
                }
                fmt.Fprintln(stderr, "health check failed")
                return errSilent
        }
+       if !*quiet {
+               fmt.Fprintln(stderr, "health check OK")
+       }
        return nil
 }
 
index b1166c27d457317b8a4f6d5b5bcdea69cc6bf273..f76f7b8ea80a45ba9d908e485d2dcae8b9eca300 100644 (file)
@@ -321,6 +321,13 @@ func (s *AggregatorSuite) TestCheckCommand(c *check.C) {
 
        exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
        c.Check(exitcode, check.Equals, 0)
+       c.Check(stderr.String(), check.Equals, "health check OK\n")
+       c.Check(stdout.String(), check.Equals, "")
+
+       stdout.Reset()
+       stderr.Reset()
+       exitcode = CheckCommand.RunCommand("check", []string{"-quiet", "-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
+       c.Check(exitcode, check.Equals, 0)
        c.Check(stderr.String(), check.Equals, "")
        c.Check(stdout.String(), check.Equals, "")
 
index e5d1d8fa380fb49513452d7555618c5410993764..2bb20ca5daec35fe5348369522a106a249959cfe 100644 (file)
@@ -833,6 +833,9 @@ def setup_config():
                         "GitInternalDir": os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'internal.git'),
                     },
                     "LocalKeepBlobBuffersPerVCPU": 0,
+                    "Logging": {
+                        "SweepInterval": 0, # disable, otherwise test cases can't acquire dblock
+                    },
                     "SupportedDockerImageFormats": {"v1": {}},
                     "ShellAccess": {
                         "Admin": True,
index 8c8039f1b842a7fa1242f0a822d964800bdf3f29..bbdd9c2843d4d810439e1f9ecafce1b0835b02ae 100644 (file)
@@ -308,25 +308,20 @@ SELECT target_uuid, perm_level
 
     # delete oid_login_perms for this user
     #
-    # note: these permission links are obsolete, they have no effect
-    # on anything and they are not created for new users.
+    # note: these permission links are obsolete anyway: they have no
+    # effect on anything and they are not created for new users.
     Link.where(tail_uuid: self.email,
                link_class: 'permission',
                name: 'can_login').destroy_all
 
-    # delete repo_perms for this user
-    Link.where(tail_uuid: self.uuid,
-               link_class: 'permission',
-               name: 'can_manage').destroy_all
-
-    # delete vm_login_perms for this user
-    Link.where(tail_uuid: self.uuid,
-               link_class: 'permission',
-               name: 'can_login').destroy_all
-
-    # delete "All users" group read permissions for this user
+    # Delete all sharing permissions so (a) the user doesn't
+    # automatically regain access to anything if re-setup in future,
+    # (b) the user doesn't appear in "currently shared with" lists
+    # shown to other users.
+    #
+    # Notably this includes the can_read -> "all users" group
+    # permission.
     Link.where(tail_uuid: self.uuid,
-               head_uuid: all_users_group_uuid,
                link_class: 'permission').destroy_all
 
     # delete any signatures by this user
index 7a0ab3826ab1c08beee1361ff81b654b4ccff86d..db1b3667cc0a94a95eebd51b46224daab3981336 100644 (file)
@@ -8,11 +8,9 @@
 # from the logs table.
 
 namespace :db do
-  desc "Remove old container log entries from the logs table"
+  desc "deprecated / no-op"
 
   task delete_old_container_logs: :environment do
-    delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND now() - containers.finished_at > interval '#{Rails.configuration.Containers.Logging.MaxAge.to_i} seconds')"
-
-    ActiveRecord::Base.connection.execute(delete_sql)
+    Rails.logger.info "this db:delete_old_container_logs rake task is no longer used"
   end
 end
index f7fddb44d371c727a0da1b9ef314004fe67f1d6d..ca143363892cad7065e65d704d1c76bbd7551c83 100644 (file)
@@ -203,6 +203,22 @@ class UsersTest < ActionDispatch::IntegrationTest
       ApiClientAuthorization.create!(user: User.find_by_uuid(created['uuid']), api_client: ApiClient.all.first).api_token
     end
 
+    # share project and collections with the new user
+    act_as_system_user do
+      Link.create!(tail_uuid: created['uuid'],
+                   head_uuid: groups(:aproject).uuid,
+                   link_class: 'permission',
+                   name: 'can_manage')
+      Link.create!(tail_uuid: created['uuid'],
+                   head_uuid: collections(:collection_owned_by_active).uuid,
+                   link_class: 'permission',
+                   name: 'can_read')
+      Link.create!(tail_uuid: created['uuid'],
+                   head_uuid: collections(:collection_owned_by_active_with_file_stats).uuid,
+                   link_class: 'permission',
+                   name: 'can_write')
+    end
+
     assert_equal 1, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'expected token not found'
 
     post "/arvados/v1/users/#{created['uuid']}/unsetup", params: {}, headers: auth(:admin)
@@ -213,6 +229,8 @@ class UsersTest < ActionDispatch::IntegrationTest
     assert_not_nil created2['uuid'], 'expected uuid for the newly created user'
     assert_equal created['uuid'], created2['uuid'], 'expected uuid not found'
     assert_equal 0, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'token should have been deleted by user unsetup'
+    # check permissions are deleted
+    assert_empty Link.where(tail_uuid: created['uuid'])
 
     verify_link_existence created['uuid'], created['email'], false, false, false, false, false
   end
diff --git a/services/api/test/tasks/delete_old_container_logs_test.rb b/services/api/test/tasks/delete_old_container_logs_test.rb
deleted file mode 100644 (file)
index c81b331..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'test_helper'
-require 'rake'
-
-Rake.application.rake_require "tasks/delete_old_container_logs"
-Rake::Task.define_task(:environment)
-
-class DeleteOldContainerLogsTaskTest < ActiveSupport::TestCase
-  TASK_NAME = "db:delete_old_container_logs"
-
-  def log_uuids(*fixture_names)
-    fixture_names.map { |name| logs(name).uuid }
-  end
-
-  def run_with_expiry(clean_after)
-    Rails.configuration.Containers.Logging.MaxAge = clean_after
-    Rake::Task[TASK_NAME].reenable
-    Rake.application.invoke_task TASK_NAME
-  end
-
-  def check_log_existence(test_method, fixture_uuids)
-    uuids_now = Log.where("object_uuid LIKE :pattern AND event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat')", pattern: "%-dz642-%").map(&:uuid)
-    fixture_uuids.each do |expect_uuid|
-      send(test_method, uuids_now, expect_uuid)
-    end
-  end
-
-  test "delete all finished logs" do
-    uuids_to_keep = log_uuids(:stderr_for_running_container,
-                              :crunchstat_for_running_container)
-    uuids_to_clean = log_uuids(:stderr_for_previous_container,
-                               :crunchstat_for_previous_container,
-                               :stderr_for_ancient_container,
-                               :crunchstat_for_ancient_container)
-    run_with_expiry(1)
-    check_log_existence(:assert_includes, uuids_to_keep)
-    check_log_existence(:refute_includes, uuids_to_clean)
-  end
-
-  test "delete old finished logs" do
-    uuids_to_keep = log_uuids(:stderr_for_running_container,
-                              :crunchstat_for_running_container,
-                              :stderr_for_previous_container,
-                              :crunchstat_for_previous_container)
-    uuids_to_clean = log_uuids(:stderr_for_ancient_container,
-                               :crunchstat_for_ancient_container)
-    run_with_expiry(360.days)
-    check_log_existence(:assert_includes, uuids_to_keep)
-    check_log_existence(:refute_includes, uuids_to_clean)
-  end
-end
index ac394e114962ddf05d2e71e94cc4bb1ff46c4780..1c0f6ad28f5ba7b6d20bcc0cadbef0fb87fec634 100644 (file)
@@ -19,6 +19,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud"
        "git.arvados.org/arvados.git/lib/service"
        "git.arvados.org/arvados.git/sdk/go/arvados"
@@ -55,10 +57,11 @@ const initialNiceValue int64 = 10000
 
 type Dispatcher struct {
        *dispatch.Dispatcher
-       logger  logrus.FieldLogger
-       cluster *arvados.Cluster
-       sqCheck *SqueueChecker
-       slurm   Slurm
+       logger      logrus.FieldLogger
+       cluster     *arvados.Cluster
+       sqCheck     *SqueueChecker
+       slurm       Slurm
+       dbConnector ctrlctx.DBConnector
 
        done chan struct{}
        err  error
@@ -90,6 +93,7 @@ func (disp *Dispatcher) configure() error {
        disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
        disp.Client.AuthToken = disp.cluster.SystemRootToken
        disp.Client.Insecure = disp.cluster.TLS.Insecure
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
 
        if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
                // Copy real configs into env vars so [a]
@@ -137,6 +141,8 @@ func (disp *Dispatcher) setup() {
 }
 
 func (disp *Dispatcher) run() error {
+       dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+       defer dblock.Dispatch.Unlock()
        defer disp.sqCheck.Stop()
 
        if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
index 6383eae5452dd1d145420e7da41ce773878b5cef..d28bee0f5e19591275eab2ae43d2a640d316de6d 100644 (file)
@@ -28,6 +28,10 @@ var (
        version               = "dev"
 )
 
+type logger interface {
+       Printf(string, ...interface{})
+}
+
 func main() {
        reporter := crunchstat.Reporter{
                Logger: log.New(os.Stderr, "crunchstat: ", 0),
@@ -55,9 +59,11 @@ func main() {
        reporter.Logger.Printf("crunchstat %s started", version)
 
        if reporter.CgroupRoot == "" {
-               reporter.Logger.Fatal("error: must provide -cgroup-root")
+               reporter.Logger.Printf("error: must provide -cgroup-root")
+               os.Exit(2)
        } else if signalOnDeadPPID < 0 {
-               reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+               reporter.Logger.Printf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+               os.Exit(2)
        }
        reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
 
@@ -76,17 +82,19 @@ func main() {
                if status, ok := err.Sys().(syscall.WaitStatus); ok {
                        os.Exit(status.ExitStatus())
                } else {
-                       reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
+                       reporter.Logger.Printf("ExitError without WaitStatus: %v", err)
+                       os.Exit(1)
                }
        } else if err != nil {
-               reporter.Logger.Fatalln("error in cmd.Wait:", err)
+               reporter.Logger.Printf("error running command: %v", err)
+               os.Exit(1)
        }
 }
 
-func runCommand(argv []string, logger *log.Logger) error {
+func runCommand(argv []string, logger logger) error {
        cmd := exec.Command(argv[0], argv[1:]...)
 
-       logger.Println("Running", argv)
+       logger.Printf("Running %v", argv)
 
        // Child process will use our stdin and stdout pipes
        // (we close our copies below)
@@ -100,7 +108,7 @@ func runCommand(argv []string, logger *log.Logger) error {
                if cmd.Process != nil {
                        cmd.Process.Signal(catch)
                }
-               logger.Println("notice: caught signal:", catch)
+               logger.Printf("notice: caught signal: %v", catch)
        }(sigChan)
        signal.Notify(sigChan, syscall.SIGTERM)
        signal.Notify(sigChan, syscall.SIGINT)
@@ -113,24 +121,30 @@ func runCommand(argv []string, logger *log.Logger) error {
        // Funnel stderr through our channel
        stderrPipe, err := cmd.StderrPipe()
        if err != nil {
-               logger.Fatalln("error in StderrPipe:", err)
+               logger.Printf("error in StderrPipe: %v", err)
+               return err
        }
 
        // Run subprocess
        if err := cmd.Start(); err != nil {
-               logger.Fatalln("error in cmd.Start:", err)
+               logger.Printf("error in cmd.Start: %v", err)
+               return err
        }
 
        // Close stdin/stdout in this (parent) process
        os.Stdin.Close()
        os.Stdout.Close()
 
-       copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+       err = copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+       if err != nil {
+               cmd.Process.Kill()
+               return err
+       }
 
        return cmd.Wait()
 }
 
-func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
+func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger logger) {
        ticker := time.NewTicker(intvl)
        for range ticker.C {
                ppid := os.Getppid()
@@ -152,7 +166,7 @@ func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.C
        }
 }
 
-func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+func copyPipeToChildLog(in io.ReadCloser, logger logger) error {
        reader := bufio.NewReaderSize(in, MaxLogLine)
        var prefix string
        for {
@@ -160,13 +174,13 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
                if err == io.EOF {
                        break
                } else if err != nil {
-                       logger.Fatal("error reading child stderr:", err)
+                       return fmt.Errorf("error reading child stderr: %w", err)
                }
                var suffix string
                if isPrefix {
                        suffix = "[...]"
                }
-               logger.Print(prefix, string(line), suffix)
+               logger.Printf("%s%s%s", prefix, string(line), suffix)
                // Set up prefix for following line
                if isPrefix {
                        prefix = "[...]"
@@ -174,5 +188,5 @@ func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
                        prefix = ""
                }
        }
-       in.Close()
+       return in.Close()
 }
index 1dedb409a4a2de5c4f414959b024e291007d42b1..9f581751d938baf8d0f8fdc726bfae92e1f6877d 100644 (file)
@@ -23,7 +23,9 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/controller/dblock"
        "git.arvados.org/arvados.git/sdk/go/arvados"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
@@ -67,16 +69,19 @@ type Balancer struct {
 // subsequent balance operation.
 //
 // Run should only be called once on a given Balancer object.
-//
-// Typical usage:
-//
-//   runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
        nextRunOptions = runOptions
 
+       ctxlog.FromContext(ctx).Info("acquiring active lock")
+       if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
+               // context canceled
+               return
+       }
+       defer dblock.KeepBalanceActive.Unlock()
+
        defer bal.time("sweep", "wall clock time to run one full sweep")()
 
-       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+       ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
        defer cancel()
 
        var lbFile *os.File
index 2db7bea173c17dc41f6943b4fe579cbc7d15a24f..4772da55a2d6dddc79acff891dee1034781f7582 100644 (file)
@@ -6,6 +6,7 @@ package keepbalance
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -372,7 +373,7 @@ func (s *runSuite) TestRefuseZeroCollections(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
-       _, err = srv.runOnce()
+       _, err = srv.runOnce(context.Background())
        c.Check(err, check.ErrorMatches, "received zero collections")
        c.Check(trashReqs.Count(), check.Equals, 4)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -391,7 +392,7 @@ func (s *runSuite) TestRefuseNonAdmin(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
-       _, err := srv.runOnce()
+       _, err := srv.runOnce(context.Background())
        c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -417,7 +418,7 @@ func (s *runSuite) TestRefuseSameDeviceDifferentVolumes(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
-       _, err := srv.runOnce()
+       _, err := srv.runOnce(context.Background())
        c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
        c.Check(trashReqs.Count(), check.Equals, 0)
        c.Check(pullReqs.Count(), check.Equals, 0)
@@ -442,7 +443,7 @@ func (s *runSuite) TestWriteLostBlocks(c *check.C) {
        s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
        c.Assert(err, check.IsNil)
-       _, err = srv.runOnce()
+       _, err = srv.runOnce(context.Background())
        c.Check(err, check.IsNil)
        lost, err := ioutil.ReadFile(lostf.Name())
        c.Assert(err, check.IsNil)
@@ -463,7 +464,7 @@ func (s *runSuite) TestDryRun(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
-       bal, err := srv.runOnce()
+       bal, err := srv.runOnce(context.Background())
        c.Check(err, check.IsNil)
        for _, req := range collReqs.reqs {
                c.Check(req.Form.Get("include_trash"), check.Equals, "true")
@@ -493,7 +494,7 @@ func (s *runSuite) TestCommit(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
        srv := s.newServer(&opts)
-       bal, err := srv.runOnce()
+       bal, err := srv.runOnce(context.Background())
        c.Check(err, check.IsNil)
        c.Check(trashReqs.Count(), check.Equals, 8)
        c.Check(pullReqs.Count(), check.Equals, 4)
@@ -533,13 +534,14 @@ func (s *runSuite) TestRunForever(c *check.C) {
        trashReqs := s.stub.serveKeepstoreTrash()
        pullReqs := s.stub.serveKeepstorePull()
 
-       stop := make(chan interface{})
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
        s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
        srv := s.newServer(&opts)
 
        done := make(chan bool)
        go func() {
-               srv.runForever(stop)
+               srv.runForever(ctx)
                close(done)
        }()
 
@@ -550,7 +552,7 @@ func (s *runSuite) TestRunForever(c *check.C) {
        for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
                time.Sleep(time.Millisecond)
        }
-       stop <- true
+       cancel()
        <-done
        c.Check(pullReqs.Count() >= 16, check.Equals, true)
        c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
index 3cfb5cdeda5039fb37f414f5cd0b095eea0e772d..42463a002a5ec73652f7f7ef6f00f8a8c4fb44a1 100644 (file)
@@ -6,6 +6,7 @@ package keepbalance
 
 import (
        "bytes"
+       "context"
        "io"
        "os"
        "strings"
@@ -97,7 +98,7 @@ func (s *integrationSuite) TestBalanceAPIFixtures(c *check.C) {
                        Logger:  logger,
                        Metrics: newMetrics(prometheus.NewRegistry()),
                }
-               nextOpts, err := bal.Run(s.client, s.config, opts)
+               nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
                c.Check(err, check.IsNil)
                c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
                c.Check(nextOpts.CommitPulls, check.Equals, true)
index f0b0df5bd331d6a97a2cdaab0a8d968cfdbfc550..b016db22ffe67f6316f1e4f537bfa680f135ecad 100644 (file)
@@ -112,7 +112,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                                Routes: health.Routes{"ping": srv.CheckHealth},
                        }
 
-                       go srv.run()
+                       go srv.run(ctx)
                        return srv
                }).RunCommand(prog, args, stdin, stdout, stderr)
 }
index e485f5b2061f28134306d1d897b22cb62e4190e9..fd53497f789ed4f5f1db458f99e69f8e7f10c1a7 100644 (file)
@@ -5,12 +5,14 @@
 package keepbalance
 
 import (
+       "context"
        "net/http"
        "os"
        "os/signal"
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/controller/dblock"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/jmoiron/sqlx"
        "github.com/sirupsen/logrus"
@@ -62,12 +64,12 @@ func (srv *Server) Done() <-chan struct{} {
        return nil
 }
 
-func (srv *Server) run() {
+func (srv *Server) run(ctx context.Context) {
        var err error
        if srv.RunOptions.Once {
-               _, err = srv.runOnce()
+               _, err = srv.runOnce(ctx)
        } else {
-               err = srv.runForever(nil)
+               err = srv.runForever(ctx)
        }
        if err != nil {
                srv.Logger.Error(err)
@@ -77,7 +79,7 @@ func (srv *Server) run() {
        }
 }
 
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
        bal := &Balancer{
                DB:             srv.DB,
                Logger:         srv.Logger,
@@ -86,13 +88,12 @@ func (srv *Server) runOnce() (*Balancer, error) {
                LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
        }
        var err error
-       srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+       srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
        return bal, err
 }
 
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
        logger := srv.Logger
 
        ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
@@ -102,6 +103,10 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
        sigUSR1 := make(chan os.Signal)
        signal.Notify(sigUSR1, syscall.SIGUSR1)
 
+       logger.Info("acquiring service lock")
+       dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+       defer dblock.KeepBalanceService.Unlock()
+
        logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
 
        for {
@@ -110,7 +115,11 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
                        logger.Print("=======  Consider using -commit-pulls and -commit-trash flags.")
                }
 
-               _, err := srv.runOnce()
+               if !dblock.KeepBalanceService.Check() {
+                       // context canceled
+                       return nil
+               }
+               _, err := srv.runOnce(ctx)
                if err != nil {
                        logger.Print("run failed: ", err)
                } else {
@@ -118,7 +127,7 @@ func (srv *Server) runForever(stop <-chan interface{}) error {
                }
 
                select {
-               case <-stop:
+               case <-ctx.Done():
                        signal.Stop(sigUSR1)
                        return nil
                case <-ticker.C:
index 869cc596a3e3cafdf3698eee8847dad78d9ebd5e..5df1870c808846c219b6fa845fc32af939908a09 100644 (file)
@@ -55,6 +55,8 @@ nginx:
               - proxy_set_header: 'X-Real-IP $remote_addr'
               - proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
               - proxy_set_header: 'X-External-Client $external_client'
+              - proxy_set_header: 'Upgrade $http_upgrade'
+              - proxy_set_header: 'Connection "upgrade"'
               - proxy_max_temp_file_size: 0
               - proxy_request_buffering: 'off'
               - proxy_buffering: 'off'
index bc28fd82595c3d39a8b657d674812b12013ff317..2b01237889a10186fdc9028d7ff8e8bd2765978f 100644 (file)
@@ -54,6 +54,8 @@ nginx:
               - proxy_set_header: 'X-Real-IP $remote_addr'
               - proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
               - proxy_set_header: 'X-External-Client $external_client'
+              - proxy_set_header: 'Upgrade $http_upgrade'
+              - proxy_set_header: 'Connection "upgrade"'
               - proxy_max_temp_file_size: 0
               - proxy_request_buffering: 'off'
               - proxy_buffering: 'off'
index 3058367bc006992342559321696fb7b96664150c..55d54cf1cba84dab338b1eb5141352ae6f745714 100644 (file)
@@ -54,6 +54,8 @@ nginx:
               - proxy_set_header: 'X-Real-IP $remote_addr'
               - proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
               - proxy_set_header: 'X-External-Client $external_client'
+              - proxy_set_header: 'Upgrade $http_upgrade'
+              - proxy_set_header: 'Connection "upgrade"'
               - proxy_max_temp_file_size: 0
               - proxy_request_buffering: 'off'
               - proxy_buffering: 'off'