net-ssh-gateway (2.0.0)
net-ssh (>= 4.0.0)
nio4r (2.5.8)
- nokogiri (1.13.7)
+ nokogiri (1.13.9)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
npm-rails (0.2.1)
- api/keep-s3.html.textile.liquid
- api/keep-web-urls.html.textile.liquid
- api/projects.html.textile.liquid
+ - api/properties.html.textile.liquid
- api/methods/collections.html.textile.liquid
- api/methods/repositories.html.textile.liquid
- Container engine:
</notextile>
-h2(#main). development main (as of 2022-10-13)
+h2(#main). development main (as of 2022-10-31)
"previous: Upgrading to 2.4.3":#v2_4_3
h3. New keepstore S3 driver enabled by default
A more actively maintained S3 client library is now enabled by default for keeepstore services. The previous driver is still available for use in case of unknown issues. To use the old driver, set @DriverParameters.UseAWSS3v2Driver@ to @false@ on the appropriate @Volumes@ config entries.
+h2(#main). development main (as of 2022-10-14)
+
+h3. Old container logs are automatically deleted from PostgreSQL
+
+Cached copies of log entries from containers that finished more than 1 month ago are now deleted automatically (this only affects the "live" logs saved in the PostgreSQL database, not log collections saved in Keep). If you have an existing cron job that runs @rake db:delete_old_container_logs@, you can remove it. See configuration options @Containers.Logging.MaxAge@ and @Containers.Logging.SweepInterval@.
+
+h3. Fixed salt installer template file to support container shell access
+
+If you manage your cluster using the salt installer, you may want to update it to the latest version, use the appropriate @config_examples@ subdirectory and re-reploy with your custom @local.params@ file so that the @arvados-controller@'s @nginx@ configuration file gets fixed.
h3. Login-sync script requires configuration update on LoginCluster federations
|is_trashed|datetime|True if @trash_at@ is in the past, false if not.||
|frozen_by_uuid|string|For a frozen project, indicates the user who froze the project; null in all other cases. When a project is frozen, no further changes can be made to the project or its contents, even by admins. Attempting to add new items or modify, rename, move, trash, or delete the project or its contents, including any subprojects, will return an error.||
-h3. Frozen projects
+h3(#frozen). Frozen projects
A user with @manage@ permission can set the @frozen_by_uuid@ attribute of a @project@ group to their own user UUID. Once this is done, no further changes can be made to the project or its contents, including subprojects.
Because the home project is a virtual project, other operations via the @groups@ API are not supported.
-h2. Filter groups
+h2(#filtergroups). Filter groups
Filter groups are another type of virtual project. They are implemented as an Arvados @group@ object with @group_class@ set to the value "filter".
--- /dev/null
+---
+layout: default
+navsection: api
+title: "Metadata properties"
+...
+{% comment %}
+Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0
+{% endcomment %}
+
+Arvados allows you to attach arbitrary properties to "collection":methods/collections.html, "container_request":methods/container_requests.html, "link":methods/links.html and "group":methods/groups.html records that have a @properties@ field. These are key-value pairs, where the value is a valid JSON type (string, number, null, boolean, array, object).
+
+Searching for records using properties is described in "Filtering on subproperties":methods.html#subpropertyfilters .
+
+h2. Reserved properties
+
+The following properties are set by Arvados components.
+
+table(table table-bordered table-condensed).
+|_. Property name|_. Appears on|_. Value type|_.Description|
+|type|collection|string|Appears on collections to indicates the contents or usage. See "Collection type values":#collectiontype below for details.|
+|container_request|collection|string|The UUID of the container request that produced an output or log collection.|
+|docker-image-repo-tag|collection|string|For collections containing a Docker image, the repo/name:tag identifier|
+|container_uuid|collection|string|The UUID of the container that produced a collection (set on collections with type=log)|
+|cwl_input|container_request|object|On an intermediate container request, the CWL workflow-level input parameters used to generate the container request|
+|cwl_output|container_request|object|On an intermediate container request, the CWL workflow-level output parameters collected from the container request|
+|template_uuid|container_request|string|For a workflow runner container request, the workflow record that was used to launch it.|
+|username|link|string|For a "can_login":permission-model.html#links permission link, the unix username on the VM that the user will have.|
+|groups|link|array of string|For a "can_login":permission-model.html#links permission link, the unix groups on the VM that the user will be added to.|
+|image_timestamp|link|string|When resolving a Docker image name and multiple links are found with @link_class=docker_image_repo+tag@ and same @link_name@, the @image_timestamp@ is used to determine precedence (most recent wins).|
+|filters|group|array of array of string|Used to define "filter groups":projects.html#filtergroup|
+
+h3(#collectiontype). Collection "type" values
+
+Meaningful values of the @type@ property. These are recognized by Workbench when filtering on types of collections from the project content listing.
+
+table(table table-bordered table-condensed).
+|_. Type|_.Description|
+|log|The collection contains log files from a container run.|
+|output|The collection contains the output of a top-level container run (this is a container request where @requesting_container_uuid@ is null).|
+|intermediate|The collection contains the output of a child container run (this is a container request where @requesting_container_uuid@ is non-empty).|
+
+h2. Controlling user-supplied properties
+
+Arvados can be configured with a vocabulary file that lists valid properties and the range of valid values for those properties. This is described in "Metadata vocabulary":{{site.baseurl}}/admin/metadata-vocabulary.html .
+
+Arvados offers options to set properties automatically and/or prevent certain properties, once set, from being changed by non-admin users. This is described in "Configuring collection's managed properties":{{site.baseurl}}/admin/collection-managed-properties.html .
+
+The admin can require that certain properties must be non-empty before "freezing a project":methods/groups.html#frozen .
LocalKeepLogsToContainerLog: none
Logging:
- # When you run the db:delete_old_container_logs task, it will find
- # containers that have been finished for at least this many seconds,
+ # Periodically (see SweepInterval) Arvados will check for
+ # containers that have been finished for at least this long,
# and delete their stdout, stderr, arv-mount, crunch-run, and
# crunchstat logs from the logs table.
MaxAge: 720h
+ # How often to delete cached log entries for finished
+ # containers (see MaxAge).
+ SweepInterval: 12h
+
# These two settings control how frequently log events are flushed to the
# database. Log lines are buffered until either crunch_log_bytes_per_event
# has been reached or crunch_log_seconds_between_events has elapsed since
# This feature is disabled when set to zero.
IdleTimeout: 0s
+ # URL to a file that is a fragment of text or HTML which should
+ # be rendered in Workbench as a banner.
+ BannerURL: ""
+
# Workbench welcome screen, this is HTML text that will be
# incorporated directly onto the page.
WelcomePageHTML: |
"Workbench.UserProfileFormFields.*.*.*": true,
"Workbench.UserProfileFormMessage": true,
"Workbench.WelcomePageHTML": true,
+ "Workbench.BannerURL": true,
}
func redactUnsafe(m map[string]interface{}, mPrefix, lookupPrefix string) error {
import (
"context"
"database/sql"
+ "fmt"
+ "net"
"sync"
"time"
)
var (
- TrashSweep = &DBLocker{key: 10001}
- retryDelay = 5 * time.Second
+ TrashSweep = &DBLocker{key: 10001}
+ ContainerLogSweep = &DBLocker{key: 10002}
+ KeepBalanceService = &DBLocker{key: 10003} // keep-balance service in periodic-sweep loop
+ KeepBalanceActive = &DBLocker{key: 10004} // keep-balance sweep in progress (either -once=true or service loop)
+ Dispatch = &DBLocker{key: 10005} // any dispatcher running
+ retryDelay = 5 * time.Second
)
// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
}
// Lock acquires the advisory lock, waiting/reconnecting if needed.
-func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
- logger := ctxlog.FromContext(ctx)
+//
+// Returns false if ctx is canceled before the lock is acquired.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) bool {
+ logger := ctxlog.FromContext(ctx).WithField("ID", dbl.key)
+ var lastHeldBy string
for ; ; time.Sleep(retryDelay) {
dbl.mtx.Lock()
if dbl.conn != nil {
dbl.mtx.Unlock()
continue
}
+ if ctx.Err() != nil {
+ dbl.mtx.Unlock()
+ return false
+ }
db, err := getdb(ctx)
- if err != nil {
- logger.WithError(err).Infof("error getting database pool")
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting database pool")
dbl.mtx.Unlock()
continue
}
conn, err := db.Conn(ctx)
- if err != nil {
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err != nil {
logger.WithError(err).Info("error getting database connection")
dbl.mtx.Unlock()
continue
}
var locked bool
err = conn.QueryRowContext(ctx, `SELECT pg_try_advisory_lock($1)`, dbl.key).Scan(&locked)
- if err != nil {
- logger.WithError(err).Infof("error getting pg_try_advisory_lock %d", dbl.key)
+ if err == context.Canceled {
+ return false
+ } else if err != nil {
+ logger.WithError(err).Info("error getting pg_try_advisory_lock")
conn.Close()
dbl.mtx.Unlock()
continue
}
if !locked {
+ var host string
+ var port int
+ err = conn.QueryRowContext(ctx, `SELECT client_addr, client_port FROM pg_stat_activity WHERE pid IN
+ (SELECT pid FROM pg_locks
+ WHERE locktype = $1 AND objid = $2)`, "advisory", dbl.key).Scan(&host, &port)
+ if err != nil {
+ logger.WithError(err).Info("error getting other client info")
+ } else {
+ heldBy := net.JoinHostPort(host, fmt.Sprintf("%d", port))
+ if lastHeldBy != heldBy {
+ logger.WithField("DBClient", heldBy).Info("waiting for other process to release lock")
+ lastHeldBy = heldBy
+ }
+ }
conn.Close()
dbl.mtx.Unlock()
continue
}
- logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ logger.Debug("acquired pg_advisory_lock")
dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
dbl.mtx.Unlock()
- return
+ return true
}
}
// Check confirms that the lock is still active (i.e., the session is
// still alive), and re-acquires if needed. Panics if Lock is not
// acquired first.
-func (dbl *DBLocker) Check() {
+//
+// Returns false if the context passed to Lock() is canceled before
+// the lock is confirmed or reacquired.
+func (dbl *DBLocker) Check() bool {
dbl.mtx.Lock()
err := dbl.conn.PingContext(dbl.ctx)
- if err == nil {
- ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ if err == context.Canceled {
+ dbl.mtx.Unlock()
+ return false
+ } else if err == nil {
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("connection still alive")
dbl.mtx.Unlock()
- return
+ return true
}
ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
dbl.conn.Close()
dbl.conn = nil
ctx, getdb := dbl.ctx, dbl.getdb
dbl.mtx.Unlock()
- dbl.Lock(ctx, getdb)
+ return dbl.Lock(ctx, getdb)
}
func (dbl *DBLocker) Unlock() {
if dbl.conn != nil {
_, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
if err != nil {
- ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithError(err).WithField("ID", dbl.key).Info("error releasing pg_advisory_lock")
} else {
- ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ ctxlog.FromContext(dbl.ctx).WithField("ID", dbl.key).Debug("released pg_advisory_lock")
}
dbl.conn.Close()
dbl.conn = nil
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+ "bytes"
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "git.arvados.org/arvados.git/lib/config"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+ "github.com/sirupsen/logrus"
+ check "gopkg.in/check.v1"
+)
+
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
+
+var _ = check.Suite(&suite{})
+
+type suite struct {
+ cluster *arvados.Cluster
+ db *sqlx.DB
+ getdb func(context.Context) (*sqlx.DB, error)
+}
+
+var testLocker = &DBLocker{key: 999}
+
+func (s *suite) SetUpSuite(c *check.C) {
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, check.IsNil)
+ s.cluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+ s.db = arvadostest.DB(c, s.cluster)
+ s.getdb = func(context.Context) (*sqlx.DB, error) { return s.db, nil }
+}
+
+func (s *suite) TestLock(c *check.C) {
+ retryDelay = 10 * time.Millisecond
+
+ var logbuf bytes.Buffer
+ logger := ctxlog.New(&logbuf, "text", "debug")
+ logger.Level = logrus.DebugLevel
+ ctx := ctxlog.Context(context.Background(), logger)
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+ testLocker.Lock(ctx, s.getdb)
+ testLocker.Check()
+
+ lock2 := make(chan bool)
+ var wg sync.WaitGroup
+ defer wg.Wait()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ testLocker2 := &DBLocker{key: 999}
+ testLocker2.Lock(ctx, s.getdb)
+ close(lock2)
+ testLocker2.Check()
+ testLocker2.Unlock()
+ }()
+
+ // Second lock should wait for first to Unlock
+ select {
+ case <-time.After(time.Second / 10):
+ c.Check(logbuf.String(), check.Matches, `(?ms).*level=info.*DBClient="[^"]+:\d+".*ID=999.*`)
+ case <-lock2:
+ c.Log("double-lock")
+ c.Fail()
+ }
+
+ testLocker.Check()
+ testLocker.Unlock()
+
+ // Now the second lock should succeed within retryDelay
+ select {
+ case <-time.After(retryDelay * 2):
+ c.Log("timed out")
+ c.Fail()
+ case <-lock2:
+ }
+ c.Logf("%s", logbuf.String())
+}
// non-nil, true, nil -- if the token is valid
func (h *Handler) validateAPItoken(req *http.Request, token string) (*CurrentUser, bool, error) {
user := CurrentUser{Authorization: arvados.APIClientAuthorization{APIToken: token}}
- db, err := h.db(req.Context())
+ db, err := h.dbConnector.GetDB(req.Context())
if err != nil {
ctxlog.FromContext(req.Context()).WithError(err).Debugf("validateAPItoken(%s): database error", token)
return nil, false, err
}
func (h *Handler) createAPItoken(req *http.Request, userUUID string, scopes []string) (*arvados.APIClientAuthorization, error) {
- db, err := h.db(req.Context())
+ db, err := h.dbConnector.GetDB(req.Context())
if err != nil {
return nil, err
}
return conn.chooseBackend(options.UUID).LinkDelete(ctx, options)
}
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.ClusterID).LogCreate(ctx, options)
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogUpdate(ctx, options)
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogGet(ctx, options)
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ return conn.generated_LogList(ctx, options)
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ return conn.chooseBackend(options.UUID).LogDelete(ctx, options)
+}
+
func (conn *Conn) SpecimenList(ctx context.Context, options arvados.ListOptions) (arvados.SpecimenList, error) {
return conn.generated_SpecimenList(ctx, options)
}
defer out.Close()
out.Write(regexp.MustCompile(`(?ms)^.*package .*?import.*?\n\)\n`).Find(buf))
io.WriteString(out, "//\n// -- this file is auto-generated -- do not edit -- edit list.go and run \"go generate\" instead --\n//\n\n")
- for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "APIClientAuthorization"} {
+ for _, t := range []string{"Container", "ContainerRequest", "Group", "Specimen", "User", "Link", "Log", "APIClientAuthorization"} {
_, err := out.Write(bytes.ReplaceAll(orig, []byte("Collection"), []byte(t)))
if err != nil {
panic(err)
return merged, err
}
+func (conn *Conn) generated_LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ var mtx sync.Mutex
+ var merged arvados.LogList
+ var needSort atomic.Value
+ needSort.Store(false)
+ err := conn.splitListRequest(ctx, options, func(ctx context.Context, _ string, backend arvados.API, options arvados.ListOptions) ([]string, error) {
+ options.ForwardedFor = conn.cluster.ClusterID + "-" + options.ForwardedFor
+ cl, err := backend.LogList(ctx, options)
+ if err != nil {
+ return nil, err
+ }
+ mtx.Lock()
+ defer mtx.Unlock()
+ if len(merged.Items) == 0 {
+ merged = cl
+ } else if len(cl.Items) > 0 {
+ merged.Items = append(merged.Items, cl.Items...)
+ needSort.Store(true)
+ }
+ uuids := make([]string, 0, len(cl.Items))
+ for _, item := range cl.Items {
+ uuids = append(uuids, item.UUID)
+ }
+ return uuids, nil
+ })
+ if needSort.Load().(bool) {
+ // Apply the default/implied order, "modified_at desc"
+ sort.Slice(merged.Items, func(i, j int) bool {
+ mi, mj := merged.Items[i].ModifiedAt, merged.Items[j].ModifiedAt
+ return mj.Before(mi)
+ })
+ }
+ if merged.Items == nil {
+ // Return empty results as [], not null
+ // (https://github.com/golang/go/issues/27589 might be
+ // a better solution in the future)
+ merged.Items = []arvados.Log{}
+ }
+ return merged, err
+}
+
func (conn *Conn) generated_APIClientAuthorizationList(ctx context.Context, options arvados.ListOptions) (arvados.APIClientAuthorizationList, error) {
var mtx sync.Mutex
var merged arvados.APIClientAuthorizationList
import (
"context"
- "errors"
"fmt"
"net/http"
"net/http/httptest"
"git.arvados.org/arvados.git/lib/controller/router"
"git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
"git.arvados.org/arvados.git/sdk/go/httpserver"
- "github.com/jmoiron/sqlx"
// sqlx needs lib/pq to talk to PostgreSQL
_ "github.com/lib/pq"
proxy *proxy
secureClient *http.Client
insecureClient *http.Client
- pgdb *sqlx.DB
- pgdbMtx sync.Mutex
+ dbConnector ctrlctx.DBConnector
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (h *Handler) CheckHealth() error {
h.setupOnce.Do(h.setup)
- _, err := h.db(context.TODO())
+ _, err := h.dbConnector.GetDB(context.TODO())
if err != nil {
return err
}
mux := http.NewServeMux()
healthFuncs := make(map[string]health.Func)
- oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
+ h.dbConnector = ctrlctx.DBConnector{PostgreSQL: h.Cluster.PostgreSQL}
+ oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.dbConnector.GetDB)
h.federation = federation.New(h.Cluster, &healthFuncs)
rtr := router.New(h.federation, router.Config{
MaxRequestSize: h.Cluster.API.MaxRequestSize,
WrapCalls: api.ComposeWrappers(
- ctrlctx.WrapCallsInTransactions(h.db),
+ ctrlctx.WrapCallsInTransactions(h.dbConnector.GetDB),
oidcAuthorizer.WrapCalls,
ctrlctx.WrapCallsWithAuth(h.Cluster)),
})
- healthRoutes := health.Routes{"ping": func() error { _, err := h.db(context.TODO()); return err }}
+ healthRoutes := health.Routes{"ping": func() error { _, err := h.dbConnector.GetDB(context.TODO()); return err }}
for name, f := range healthFuncs {
healthRoutes[name] = f
}
}
go h.trashSweepWorker()
-}
-
-var errDBConnection = errors.New("database connection error")
-
-func (h *Handler) db(ctx context.Context) (*sqlx.DB, error) {
- h.pgdbMtx.Lock()
- defer h.pgdbMtx.Unlock()
- if h.pgdb != nil {
- return h.pgdb, nil
- }
-
- db, err := sqlx.Open("postgres", h.Cluster.PostgreSQL.Connection.String())
- if err != nil {
- ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
- return nil, errDBConnection
- }
- if p := h.Cluster.PostgreSQL.ConnectionPool; p > 0 {
- db.SetMaxOpenConns(p)
- }
- if err := db.Ping(); err != nil {
- ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
- return nil, errDBConnection
- }
- h.pgdb = db
- return db, nil
+ go h.containerLogSweepWorker()
}
type middlewareFunc func(http.ResponseWriter, *http.Request, http.Handler)
}
func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
c.Assert(err, check.IsNil)
}
func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
c.Assert(err, check.IsNil)
}
func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
+ c.Assert(s.handler.CheckHealth(), check.IsNil)
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
c.Assert(err, check.IsNil)
coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
c.Assert(err, check.IsNil)
defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
- db, err := s.handler.db(s.ctx)
+ db, err := s.handler.dbConnector.GetDB(s.ctx)
c.Assert(err, check.IsNil)
_, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
c.Assert(err, check.IsNil)
}
}
+func (s *HandlerSuite) TestContainerLogSweep(c *check.C) {
+ s.cluster.SystemRootToken = arvadostest.SystemRootToken
+ s.cluster.Containers.Logging.SweepInterval = arvados.Duration(time.Second / 10)
+ s.handler.CheckHealth()
+ ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+ logentry, err := s.handler.federation.LogCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{
+ "object_uuid": arvadostest.CompletedContainerUUID,
+ "event_type": "stderr",
+ "properties": map[string]interface{}{
+ "text": "test trash sweep\n",
+ },
+ }})
+ c.Assert(err, check.IsNil)
+ defer s.handler.federation.LogDelete(ctx, arvados.DeleteOptions{UUID: logentry.UUID})
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ c.Log("timed out")
+ c.FailNow()
+ }
+ logentries, err := s.handler.federation.LogList(ctx, arvados.ListOptions{Filters: []arvados.Filter{{"uuid", "=", logentry.UUID}}, Limit: -1})
+ c.Assert(err, check.IsNil)
+ if len(logentries.Items) == 0 {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+}
+
func (s *HandlerSuite) TestLogActivity(c *check.C) {
s.cluster.SystemRootToken = arvadostest.SystemRootToken
s.cluster.Users.ActivityLoggingPeriod = arvados.Duration(24 * time.Hour)
c.Assert(err, check.IsNil)
}
}
- db, err := s.handler.db(s.ctx)
+ db, err := s.handler.dbConnector.GetDB(s.ctx)
c.Assert(err, check.IsNil)
for _, userUUID := range []string{arvadostest.ActiveUserUUID, arvadostest.SpectatorUserUUID} {
var rows int
return rtr.backend.LinkDelete(ctx, *opts.(*arvados.DeleteOptions))
},
},
+ {
+ arvados.EndpointLogCreate,
+ func() interface{} { return &arvados.CreateOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogCreate(ctx, *opts.(*arvados.CreateOptions))
+ },
+ },
+ {
+ arvados.EndpointLogUpdate,
+ func() interface{} { return &arvados.UpdateOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogUpdate(ctx, *opts.(*arvados.UpdateOptions))
+ },
+ },
+ {
+ arvados.EndpointLogList,
+ func() interface{} { return &arvados.ListOptions{Limit: -1} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogList(ctx, *opts.(*arvados.ListOptions))
+ },
+ },
+ {
+ arvados.EndpointLogGet,
+ func() interface{} { return &arvados.GetOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogGet(ctx, *opts.(*arvados.GetOptions))
+ },
+ },
+ {
+ arvados.EndpointLogDelete,
+ func() interface{} { return &arvados.DeleteOptions{} },
+ func(ctx context.Context, opts interface{}) (interface{}, error) {
+ return rtr.backend.LogDelete(ctx, *opts.(*arvados.DeleteOptions))
+ },
+ },
{
arvados.EndpointSpecimenCreate,
func() interface{} { return &arvados.CreateOptions{} },
return resp, err
}
+func (conn *Conn) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogCreate
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogUpdate
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogGet
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ ep := arvados.EndpointLogList
+ var resp arvados.LogList
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
+func (conn *Conn) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ ep := arvados.EndpointLogDelete
+ var resp arvados.Log
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
func (conn *Conn) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
ep := arvados.EndpointSpecimenCreate
var resp arvados.Specimen
package controller
import (
+ "context"
"time"
"git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
)
-func (h *Handler) trashSweepWorker() {
- sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
- logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+func (h *Handler) periodicWorker(workerName string, interval time.Duration, locker *dblock.DBLocker, run func(context.Context) error) {
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", workerName)
ctx := ctxlog.Context(h.BackgroundContext, logger)
- if sleep <= 0 {
- logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ if interval <= 0 {
+ logger.Debugf("interval is %v, not running worker", interval)
return
}
- dblock.TrashSweep.Lock(ctx, h.db)
- defer dblock.TrashSweep.Unlock()
- for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
- dblock.TrashSweep.Check()
- ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
- _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ if !locker.Lock(ctx, h.dbConnector.GetDB) {
+ // context canceled
+ return
+ }
+ defer locker.Unlock()
+ for time.Sleep(interval); ctx.Err() == nil; time.Sleep(interval) {
+ if !locker.Check() {
+ // context canceled
+ return
+ }
+ err := run(ctx)
if err != nil {
- logger.WithError(err).Info("trash sweep failed")
+ logger.WithError(err).Infof("%s failed", workerName)
}
}
}
+
+func (h *Handler) trashSweepWorker() {
+ h.periodicWorker("trash sweep", h.Cluster.Collections.TrashSweepInterval.Duration(), dblock.TrashSweep, func(ctx context.Context) error {
+ ctx = auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ return err
+ })
+}
+
+func (h *Handler) containerLogSweepWorker() {
+ h.periodicWorker("container log sweep", h.Cluster.Containers.Logging.SweepInterval.Duration(), dblock.ContainerLogSweep, func(ctx context.Context) error {
+ db, err := h.dbConnector.GetDB(ctx)
+ if err != nil {
+ return err
+ }
+ res, err := db.ExecContext(ctx, `
+DELETE FROM logs
+ USING containers
+ WHERE logs.object_uuid=containers.uuid
+ AND logs.event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat', 'hoststat', 'node', 'container', 'keepstore')
+ AND containers.log IS NOT NULL
+ AND now() - containers.finished_at > $1::interval`,
+ h.Cluster.Containers.Logging.MaxAge.String())
+ if err != nil {
+ return err
+ }
+ logger := ctxlog.FromContext(ctx)
+ rows, err := res.RowsAffected()
+ if err != nil {
+ logger.WithError(err).Warn("unexpected error from RowsAffected()")
+ } else {
+ logger.WithField("rows", rows).Info("deleted rows from logs table")
+ }
+ return nil
+ })
+}
parentTemp string
costStartTime time.Time
+ keepstore *exec.Cmd
keepstoreLogger io.WriteCloser
keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
if err != nil {
return nil, fmt.Errorf("while trying to start arv-mount: %v", err)
}
+ if runner.hoststatReporter != nil && runner.ArvMount != nil {
+ runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid)
+ }
for _, p := range collectionPaths {
_, err = os.Stat(p)
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
+ runner.hoststatReporter.ReportPID("crunch-run", os.Getpid())
return nil
}
if err != nil {
return
}
+ if runner.keepstore != nil {
+ runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid)
+ }
// set up FUSE mount and binds
bindmounts, err = runner.SetupMounts()
return 1
}
+ cr.keepstore = keepstore
if keepstore == nil {
// Log explanation (if any) for why we're not running
// a local keepstore.
"fmt"
"io"
"io/ioutil"
- "log"
"os"
+ "regexp"
+ "sort"
"strconv"
"strings"
+ "sync"
"syscall"
"time"
)
TempDir string
// Where to write statistics. Must not be nil.
- Logger *log.Logger
+ Logger interface {
+ Printf(fmt string, args ...interface{})
+ }
+ kernelPageSize int64
reportedStatFile map[string]string
lastNetSample map[string]ioSample
lastDiskIOSample map[string]ioSample
lastCPUSample cpuSample
lastDiskSpaceSample diskSpaceSample
+ reportPIDs map[string]int
+ reportPIDsMu sync.Mutex
+
done chan struct{} // closed when we should stop reporting
flushed chan struct{} // closed when we have made our last report
}
go r.run()
}
+// ReportPID starts reporting stats for a specified process.
+func (r *Reporter) ReportPID(name string, pid int) {
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ if r.reportPIDs == nil {
+ r.reportPIDs = map[string]int{name: pid}
+ } else {
+ r.reportPIDs[name] = pid
+ }
+}
+
// Stop reporting. Do not call more than once, or before calling
// Start.
//
}
}
r.Logger.Printf("mem%s\n", outstat.String())
+
+ if r.kernelPageSize == 0 {
+ // assign "don't try again" value in case we give up
+ // and return without assigning the real value
+ r.kernelPageSize = -1
+ buf, err := os.ReadFile("/proc/self/smaps")
+ if err != nil {
+ r.Logger.Printf("error reading /proc/self/smaps: %s", err)
+ return
+ }
+ m := regexp.MustCompile(`\nKernelPageSize:\s*(\d+) kB\n`).FindSubmatch(buf)
+ if len(m) != 2 {
+ r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize not found")
+ return
+ }
+ size, err := strconv.ParseInt(string(m[1]), 10, 64)
+ if err != nil {
+ r.Logger.Printf("error parsing /proc/self/smaps: KernelPageSize %q: %s", m[1], err)
+ return
+ }
+ r.kernelPageSize = size * 1024
+ } else if r.kernelPageSize < 0 {
+ // already failed to determine page size, don't keep
+ // trying/logging
+ return
+ }
+
+ r.reportPIDsMu.Lock()
+ defer r.reportPIDsMu.Unlock()
+ procnames := make([]string, 0, len(r.reportPIDs))
+ for name := range r.reportPIDs {
+ procnames = append(procnames, name)
+ }
+ sort.Strings(procnames)
+ procmem := ""
+ for _, procname := range procnames {
+ pid := r.reportPIDs[procname]
+ buf, err := os.ReadFile(fmt.Sprintf("/proc/%d/stat", pid))
+ if err != nil {
+ continue
+ }
+ // If the executable name contains a ')' char,
+ // /proc/$pid/stat will look like '1234 (exec name)) S
+ // 123 ...' -- the last ')' is the end of the 2nd
+ // field.
+ paren := bytes.LastIndexByte(buf, ')')
+ if paren < 0 {
+ continue
+ }
+ fields := bytes.SplitN(buf[paren:], []byte{' '}, 24)
+ if len(fields) < 24 {
+ continue
+ }
+ // rss is the 24th field in .../stat, and fields[0]
+ // here is the last char ')' of the 2nd field, so
+ // rss is fields[22]
+ rss, err := strconv.ParseInt(string(fields[22]), 10, 64)
+ if err != nil {
+ continue
+ }
+ procmem += fmt.Sprintf(" %d %s", rss*r.kernelPageSize, procname)
+ }
+ if procmem != "" {
+ r.Logger.Printf("procmem%s\n", procmem)
+ }
}
func (r *Reporter) doNetworkStats() {
package crunchstat
import (
- "bufio"
- "io"
+ "bytes"
"log"
"os"
"regexp"
+ "strconv"
"testing"
+ "time"
+
+ "github.com/sirupsen/logrus"
+ . "gopkg.in/check.v1"
)
-func bufLogger() (*log.Logger, *bufio.Reader) {
- r, w := io.Pipe()
- logger := log.New(w, "", 0)
- return logger, bufio.NewReader(r)
+func Test(t *testing.T) {
+ TestingT(t)
}
-func TestReadAllOrWarnFail(t *testing.T) {
- logger, rcv := bufLogger()
- rep := Reporter{Logger: logger}
+var _ = Suite(&suite{})
- done := make(chan bool)
- var msg []byte
- var err error
- go func() {
- msg, err = rcv.ReadBytes('\n')
- close(done)
- }()
- {
- // The special file /proc/self/mem can be opened for
- // reading, but reading from byte 0 returns an error.
- f, err := os.Open("/proc/self/mem")
- if err != nil {
- t.Fatalf("Opening /proc/self/mem: %s", err)
- }
- if x, err := rep.readAllOrWarn(f); err == nil {
- t.Fatalf("Expected error, got %v", x)
- }
- }
- <-done
- if err != nil {
- t.Fatal(err)
- } else if matched, err := regexp.MatchString("^warning: read /proc/self/mem: .*", string(msg)); err != nil || !matched {
- t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
- }
+type suite struct{}
+
+func (s *suite) TestReadAllOrWarnFail(c *C) {
+ var logger bytes.Buffer
+ rep := Reporter{Logger: log.New(&logger, "", 0)}
+
+ // The special file /proc/self/mem can be opened for
+ // reading, but reading from byte 0 returns an error.
+ f, err := os.Open("/proc/self/mem")
+ c.Assert(err, IsNil)
+ defer f.Close()
+ _, err = rep.readAllOrWarn(f)
+ c.Check(err, NotNil)
+ c.Check(logger.String(), Matches, "^warning: read /proc/self/mem: .*\n")
}
-func TestReadAllOrWarnSuccess(t *testing.T) {
- rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+func (s *suite) TestReadAllOrWarnSuccess(c *C) {
+ var logbuf bytes.Buffer
+ rep := Reporter{Logger: log.New(&logbuf, "", 0)}
f, err := os.Open("./crunchstat_test.go")
- if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
- }
+ c.Assert(err, IsNil)
+ defer f.Close()
data, err := rep.readAllOrWarn(f)
- if err != nil {
- t.Fatalf("got error %s", err)
+ c.Check(err, IsNil)
+ c.Check(string(data), Matches, "(?ms).*\npackage crunchstat\n.*")
+ c.Check(logbuf.String(), Equals, "")
+}
+
+func (s *suite) TestReportPIDs(c *C) {
+ var logbuf bytes.Buffer
+ logger := logrus.New()
+ logger.Out = &logbuf
+ r := Reporter{
+ Logger: logger,
+ CgroupRoot: "/sys/fs/cgroup",
+ PollPeriod: time.Second,
}
- if matched, err := regexp.MatchString("\npackage crunchstat\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ r.Start()
+ r.ReportPID("init", 1)
+ r.ReportPID("test_process", os.Getpid())
+ r.ReportPID("nonexistent", 12345) // should be silently ignored/omitted
+ for deadline := time.Now().Add(10 * time.Second); ; time.Sleep(time.Millisecond) {
+ if time.Now().After(deadline) {
+ c.Error("timed out")
+ break
+ }
+ if m := regexp.MustCompile(`(?ms).*procmem \d+ init (\d+) test_process.*`).FindSubmatch(logbuf.Bytes()); len(m) > 0 {
+ size, err := strconv.ParseInt(string(m[1]), 10, 64)
+ c.Check(err, IsNil)
+ // Expect >1 MiB and <100 MiB -- otherwise we
+ // are probably misinterpreting /proc/N/stat
+ // or multiplying by the wrong page size.
+ c.Check(size > 1000000, Equals, true)
+ c.Check(size < 100000000, Equals, true)
+ break
+ }
}
+ c.Logf("%s", logbuf.String())
}
"sync"
"git.arvados.org/arvados.git/lib/controller/api"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/jmoiron/sqlx"
})
return txn.tx, txn.err
}
+
+var errDBConnection = errors.New("database connection error")
+
+type DBConnector struct {
+ PostgreSQL arvados.PostgreSQL
+ pgdb *sqlx.DB
+ mtx sync.Mutex
+}
+
+func (dbc *DBConnector) GetDB(ctx context.Context) (*sqlx.DB, error) {
+ dbc.mtx.Lock()
+ defer dbc.mtx.Unlock()
+ if dbc.pgdb != nil {
+ return dbc.pgdb, nil
+ }
+ db, err := sqlx.Open("postgres", dbc.PostgreSQL.Connection.String())
+ if err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect failed")
+ return nil, errDBConnection
+ }
+ if p := dbc.PostgreSQL.ConnectionPool; p > 0 {
+ db.SetMaxOpenConns(p)
+ }
+ if err := db.Ping(); err != nil {
+ ctxlog.FromContext(ctx).WithError(err).Error("postgresql connect succeeded but ping failed")
+ return nil, errDBConnection
+ }
+ dbc.pgdb = db
+ return db, nil
+}
f.StringVar(&diag.dockerImage, "docker-image", "", "image to use when running a test container (default: use embedded hello-world image)")
f.BoolVar(&diag.checkInternal, "internal-client", false, "check that this host is considered an \"internal\" client")
f.BoolVar(&diag.checkExternal, "external-client", false, "check that this host is considered an \"external\" client")
+ f.BoolVar(&diag.verbose, "v", false, "verbose: include more information in report")
f.IntVar(&diag.priority, "priority", 500, "priority for test container (1..1000, or 0 to skip)")
f.DurationVar(&diag.timeout, "timeout", 10*time.Second, "timeout for http requests")
if ok, code := cmd.ParseFlags(f, prog, args, "", stderr); !ok {
}
// docker save hello-world > hello-world.tar
+//
//go:embed hello-world.tar
var HelloWorldDockerImage []byte
dockerImage string
checkInternal bool
checkExternal bool
+ verbose bool
timeout time.Duration
logger *logrus.Logger
errors []string
diag.logger.Infof(" ... "+f, args...)
}
+func (diag *diagnoser) verbosef(f string, args ...interface{}) {
+ if diag.verbose {
+ diag.logger.Infof(" ... "+f, args...)
+ }
+}
+
func (diag *diagnoser) warnf(f string, args ...interface{}) {
diag.logger.Warnf(" ... "+f, args...)
}
return
}
+ hostname, err := os.Hostname()
+ if err != nil {
+ diag.warnf("error getting hostname: %s")
+ } else {
+ diag.verbosef("hostname = %s", hostname)
+ }
+
diag.dotest(5, "running health check (same as `arvados-server check`)", func() error {
ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(&bytes.Buffer{}, "text", "info"))
ldr.SetupFlags(flag.NewFlagSet("diagnostics", flag.ContinueOnError))
return err
}
if cluster.SystemRootToken != os.Getenv("ARVADOS_API_TOKEN") {
- diag.infof("skipping because provided token is not SystemRootToken")
+ return fmt.Errorf("diagnostics usage error: %s is readable but SystemRootToken does not match $ARVADOS_API_TOKEN (to fix, either run 'arvados-client sudo diagnostics' to load everything from config file, or set ARVADOS_CONFIG=- to load nothing from config file)", ldr.Path)
}
agg := &health.Aggregator{Cluster: cluster}
resp := agg.ClusterHealth()
for _, e := range resp.Errors {
diag.errorf("health check: %s", e)
}
- diag.infof("health check: reported clock skew %v", resp.ClockSkew)
+ if len(resp.Errors) > 0 {
+ diag.infof("consider running `arvados-server check -yaml` for a comprehensive report")
+ }
+ diag.verbosef("reported clock skew = %v", resp.ClockSkew)
+ reported := map[string]bool{}
+ for _, result := range resp.Checks {
+ version := strings.SplitN(result.Metrics.Version, " (go", 2)[0]
+ if version != "" && !reported[version] {
+ diag.verbosef("arvados version = %s", version)
+ reported[version] = true
+ }
+ }
+ reported = map[string]bool{}
+ for _, result := range resp.Checks {
+ if result.Server != "" && !reported[result.Server] {
+ diag.verbosef("http frontend version = %s", result.Server)
+ reported[result.Server] = true
+ }
+ }
+ reported = map[string]bool{}
+ for _, result := range resp.Checks {
+ if sha := result.ConfigSourceSHA256; sha != "" && !reported[sha] {
+ diag.verbosef("config file sha256 = %s", sha)
+ reported[sha] = true
+ }
+ }
return nil
})
if err != nil {
return err
}
- diag.debugf("BlobSignatureTTL = %d", dd.BlobSignatureTTL)
+ diag.verbosef("BlobSignatureTTL = %d", dd.BlobSignatureTTL)
return nil
})
if err != nil {
return err
}
- diag.debugf("Collections.BlobSigning = %v", cluster.Collections.BlobSigning)
+ diag.verbosef("Collections.BlobSigning = %v", cluster.Collections.BlobSigning)
cfgOK = true
return nil
})
if err != nil {
return err
}
- diag.debugf("user uuid = %s", user.UUID)
+ diag.verbosef("user uuid = %s", user.UUID)
return nil
})
isInternal := found["proxy"] == 0 && len(keeplist.Items) > 0
isExternal := found["proxy"] > 0 && found["proxy"] == len(keeplist.Items)
if isExternal {
- diag.debugf("controller returned only proxy services, this host is treated as \"external\"")
+ diag.verbosef("controller returned only proxy services, this host is treated as \"external\"")
} else if isInternal {
- diag.debugf("controller returned only non-proxy services, this host is treated as \"internal\"")
+ diag.verbosef("controller returned only non-proxy services, this host is treated as \"internal\"")
}
if (diag.checkInternal && !isInternal) || (diag.checkExternal && !isExternal) {
return fmt.Errorf("expecting internal=%v external=%v, but found internal=%v external=%v", diag.checkInternal, diag.checkExternal, isInternal, isExternal)
}
if len(grplist.Items) > 0 {
project = grplist.Items[0]
- diag.debugf("using existing project, uuid = %s", project.UUID)
+ diag.verbosef("using existing project, uuid = %s", project.UUID)
return nil
}
diag.debugf("list groups: ok, no results")
if err != nil {
return fmt.Errorf("create project: %s", err)
}
- diag.debugf("created project, uuid = %s", project.UUID)
+ diag.verbosef("created project, uuid = %s", project.UUID)
return nil
})
if err != nil {
return err
}
- diag.debugf("ok, uuid = %s", collection.UUID)
+ diag.verbosef("ok, uuid = %s", collection.UUID)
return nil
})
if err != nil {
return err
}
- diag.debugf("container request uuid = %s", cr.UUID)
- diag.debugf("container uuid = %s", cr.ContainerUUID)
+ diag.verbosef("container request uuid = %s", cr.UUID)
+ diag.verbosef("container uuid = %s", cr.ContainerUUID)
timeout := 10 * time.Minute
diag.infof("container request submitted, waiting up to %v for container to run", arvados.Duration(timeout))
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud/container"
"git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
"git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
Registry *prometheus.Registry
InstanceSetID cloud.InstanceSetID
+ dbConnector ctrlctx.DBConnector
logger logrus.FieldLogger
instanceSet cloud.InstanceSet
pool pool
func (disp *dispatcher) initialize() {
disp.logger = ctxlog.FromContext(disp.Context)
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
disp.ArvClient.AuthToken = disp.AuthToken
if err != nil {
disp.logger.Fatalf("error initializing driver: %s", err)
}
+ dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
disp.instanceSet = instanceSet
disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
}
func (disp *dispatcher) run() {
+ defer dblock.Dispatch.Unlock()
defer close(disp.stopped)
defer disp.instanceSet.Stop()
defer disp.pool.Stop()
"sync"
"time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
MinTimeBetweenCreateCalls: time.Millisecond,
}
+ // We need the postgresql connection info from the integration
+ // test config.
+ cfg, err := config.NewLoader(nil, ctxlog.FromContext(s.ctx)).Load()
+ c.Assert(err, check.IsNil)
+ testcluster, err := cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+
s.cluster = &arvados.Cluster{
ManagementToken: "test-management-token",
+ PostgreSQL: testcluster.PostgreSQL,
Containers: arvados.ContainersConfig{
CrunchRunCommand: "crunch-run",
CrunchRunArgumentsList: []string{"--foo", "--extra='args'"},
err := s.disp.CheckHealth()
c.Check(err, check.IsNil)
- select {
- case <-done:
- c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
- case <-time.After(10 * time.Second):
- c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
+ for len(waiting) > 0 {
+ waswaiting := len(waiting)
+ select {
+ case <-done:
+ // loop will end because len(waiting)==0
+ case <-time.After(3 * time.Second):
+ if len(waiting) >= waswaiting {
+ c.Fatalf("timed out; no progress in 3s while waiting for %d containers: %q", len(waiting), waiting)
+ }
+ }
}
+ c.Logf("containers finished (%s), waiting for instances to shutdown and queue to clear", time.Since(start))
deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
Registry *prometheus.Registry
logger logrus.FieldLogger
+ dbConnector ctrlctx.DBConnector
lsfcli lsfcli
lsfqueue lsfqueue
arvDispatcher *dispatch.Dispatcher
func (disp *dispatcher) Start() {
disp.initOnce.Do(func() {
disp.init()
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
go func() {
+ defer dblock.Dispatch.Unlock()
disp.checkLsfQueueForOrphans()
err := disp.arvDispatcher.Run(disp.Context)
if err != nil {
lsfcli: &disp.lsfcli,
}
disp.ArvClient.AuthToken = disp.AuthToken
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
container_request["state"] = "Committed"
container_request.setdefault("properties", {})
+ container_request["properties"]["cwl_input"] = self.joborder
+
runtime_constraints = {}
if runtimeContext.project_uuid:
if container["output"]:
outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+
+ properties = record["properties"].copy()
+ properties["cwl_output"] = outputs
+ self.arvrunner.api.container_requests().update(
+ uuid=self.uuid,
+ body={"container_request": {"properties": properties}}
+ ).execute(num_retries=self.arvrunner.num_retries)
except WorkflowException as e:
# Only include a stack trace if in debug mode.
# A stack trace may obfuscate more useful output about the workflow.
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
'scheduling_parameters': {
'partitions': ['blurb']
},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}
'cwd': '/var/spool/cwl',
'scheduling_parameters': {
},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
arvjob.successCodes = [0]
arvjob.outdir = "/var/spool/cwl"
arvjob.output_ttl = 3600
+ arvjob.uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzz1"
arvjob.collect_outputs.return_value = {"out": "stuff"}
"output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
"container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "modified_at": "2017-05-26T12:01:22Z"
+ "modified_at": "2017-05-26T12:01:22Z",
+ "properties": {}
})
self.assertFalse(api.collections().create.called)
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
+ runner.api.container_requests().update.assert_called_with(uuid="zzzzz-xvhdp-zzzzzzzzzzzzzz1",
+ body={'container_request': {'properties': {'cwl_output': {'out': 'stuff'}}}})
+
+
# Test to make sure we dont call runtime_status_update if we already did
# some where higher up in the call stack
@mock.patch("arvados_cwl.util.get_current_container")
"output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
"container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "modified_at": "2017-05-26T12:01:22Z"
+ "modified_at": "2017-05-26T12:01:22Z",
+ "properties": {}
})
rts_mock.assert_called_with(
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {
+ "p1": {
+ "basename": "99999999999999999999999999999994+44",
+ "class": "Directory",
+ "dirname": "/keep",
+ "http://arvados.org/cwl#collectionUUID": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "listing": [
+ {
+ "basename": "file1",
+ "class": "File",
+ "dirname": "/keep/99999999999999999999999999999994+44",
+ "location": "keep:99999999999999999999999999999994+44/file1",
+ "nameext": "",
+ "nameroot": "file1",
+ "path": "/keep/99999999999999999999999999999994+44/file1",
+ "size": 0
+ },
+ {
+ "basename": "file2",
+ "class": "File",
+ "dirname": "/keep/99999999999999999999999999999994+44",
+ "location": "keep:99999999999999999999999999999994+44/file2",
+ "nameext": "",
+ "nameroot": "file2",
+ "path": "/keep/99999999999999999999999999999994+44/file2",
+ "size": 0
+ }
+ ],
+ "location": "keep:99999999999999999999999999999994+44",
+ "path": "/keep/99999999999999999999999999999994+44"
+ }
+ }},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
'command': ['md5sum', 'example.conf'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': job_order},
"secret_mounts": {
"/var/spool/cwl/example.conf": {
"content": "username: user\npassword: blorp\n",
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["foo_sc", "bar_sc"]
}))
'scheduling_parameters': {},
'properties': {
"baz": "blorp",
+ "cwl_input": {"x": "blorp"},
"foo": "bar",
"quux": {
"q1": 1,
'command': ['nvidia-smi'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
'command': ['echo'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': {},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}
'command': ['ls', '/var/spool/cwl'],
'cwd': '/var/spool/cwl',
'scheduling_parameters': sched,
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'secret_mounts': {},
'output_storage_classes': ["default"]
}))
"output_path": "/var/spool/cwl",
"output_ttl": 0,
"priority": 500,
- "properties": {},
+ "properties": {'cwl_input': {
+ "fileblub": {
+ "basename": "token.txt",
+ "class": "File",
+ "dirname": "/keep/99999999999999999999999999999999+118",
+ "location": "keep:99999999999999999999999999999999+118/token.txt",
+ "nameext": ".txt",
+ "nameroot": "token",
+ "path": "/keep/99999999999999999999999999999999+118/token.txt",
+ "size": 0
+ },
+ "sleeptime": 5
+ }},
"runtime_constraints": {
"ram": 1073741824,
"vcpus": 1
'name': u'echo-subwf',
'secret_mounts': {},
'runtime_constraints': {'API': True, 'vcpus': 3, 'ram': 1073741824},
- 'properties': {},
+ 'properties': {'cwl_input': {}},
'priority': 500,
'mounts': {
'/var/spool/cwl/cwl.input.yml': {
EndpointLinkGet = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
EndpointLinkList = APIEndpoint{"GET", "arvados/v1/links", ""}
EndpointLinkDelete = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+ EndpointLogCreate = APIEndpoint{"POST", "arvados/v1/logs", "log"}
+ EndpointLogUpdate = APIEndpoint{"PATCH", "arvados/v1/logs/{uuid}", "log"}
+ EndpointLogGet = APIEndpoint{"GET", "arvados/v1/logs/{uuid}", ""}
+ EndpointLogList = APIEndpoint{"GET", "arvados/v1/logs", ""}
+ EndpointLogDelete = APIEndpoint{"DELETE", "arvados/v1/logs/{uuid}", ""}
EndpointSysTrashSweep = APIEndpoint{"POST", "sys/trash_sweep", ""}
EndpointUserActivate = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
EndpointUserCreate = APIEndpoint{"POST", "arvados/v1/users", "user"}
LinkGet(ctx context.Context, options GetOptions) (Link, error)
LinkList(ctx context.Context, options ListOptions) (LinkList, error)
LinkDelete(ctx context.Context, options DeleteOptions) (Link, error)
+ LogCreate(ctx context.Context, options CreateOptions) (Log, error)
+ LogUpdate(ctx context.Context, options UpdateOptions) (Log, error)
+ LogGet(ctx context.Context, options GetOptions) (Log, error)
+ LogList(ctx context.Context, options ListOptions) (LogList, error)
+ LogDelete(ctx context.Context, options DeleteOptions) (Log, error)
SpecimenCreate(ctx context.Context, options CreateOptions) (Specimen, error)
SpecimenUpdate(ctx context.Context, options UpdateOptions) (Specimen, error)
SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
// Space characters are trimmed when reading the settings file, so
// these are equivalent:
//
-// ARVADOS_API_HOST=localhost\n
-// ARVADOS_API_HOST=localhost\r\n
-// ARVADOS_API_HOST = localhost \n
-// \tARVADOS_API_HOST = localhost\n
+// ARVADOS_API_HOST=localhost\n
+// ARVADOS_API_HOST=localhost\r\n
+// ARVADOS_API_HOST = localhost \n
+// \tARVADOS_API_HOST = localhost\n
func NewClientFromEnv() *Client {
vars := map[string]string{}
home := os.Getenv("HOME")
// Convert an arbitrary struct to url.Values. For example,
//
-// Foo{Bar: []int{1,2,3}, Baz: "waz"}
+// Foo{Bar: []int{1,2,3}, Baz: "waz"}
//
// becomes
//
-// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
+// url.Values{`bar`:`{"a":[1,2,3]}`,`Baz`:`waz`}
//
// params itself is returned if it is already an url.Values.
func anythingToValues(params interface{}) (url.Values, error) {
SSHHelpPageHTML string
SSHHelpHostSuffix string
IdleTimeout Duration
+ BannerURL string
}
}
}
Logging struct {
MaxAge Duration
+ SweepInterval Duration
LogBytesPerEvent int
LogSecondsBetweenEvents Duration
LogThrottlePeriod Duration
var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
// UnmarshalJSON does special handling of InstanceTypes:
-// * populate computed fields (Name and Scratch)
-// * error out if InstancesTypes are populated as an array, which was
-// deprecated in Arvados 1.2.0
+//
+// - populate computed fields (Name and Scratch)
+//
+// - error out if InstancesTypes are populated as an array, which was
+// deprecated in Arvados 1.2.0
func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
fixup := func(t InstanceType) (InstanceType, error) {
if t.ProviderType == "" {
package arvados
import (
+ "bytes"
"encoding/json"
"fmt"
"strings"
// UnmarshalJSON implements json.Unmarshaler.
func (d *Duration) UnmarshalJSON(data []byte) error {
+ if bytes.Equal(data, []byte(`"0"`)) || bytes.Equal(data, []byte(`0`)) {
+ // Unitless 0 is not accepted by ParseDuration, but we
+ // accept it as a reasonable spelling of 0
+ // nanoseconds.
+ *d = 0
+ return nil
+ }
if data[0] == '"' {
return d.Set(string(data[1 : len(data)-1]))
}
err = json.Unmarshal([]byte(`{"D":"60s"}`), &d)
c.Check(err, check.IsNil)
c.Check(d.D.Duration(), check.Equals, time.Minute)
+
+ d.D = Duration(time.Second)
+ err = json.Unmarshal([]byte(`{"D":"0"}`), &d)
+ c.Check(err, check.IsNil)
+ c.Check(d.D.Duration(), check.Equals, time.Duration(0))
+
+ d.D = Duration(time.Second)
+ err = json.Unmarshal([]byte(`{"D":0}`), &d)
+ c.Check(err, check.IsNil)
+ c.Check(d.D.Duration(), check.Equals, time.Duration(0))
}
//
// After seeking:
//
-// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
-// ||
-// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
+// ptr.segmentIdx == len(filenode.segments) // i.e., at EOF
+// ||
+// filenode.segments[ptr.segmentIdx].Len() > ptr.segmentOff
func (fn *filenode) seek(startPtr filenodePtr) (ptr filenodePtr) {
ptr = startPtr
if ptr.off < 0 {
type Log struct {
ID uint64 `json:"id"`
UUID string `json:"uuid"`
+ OwnerUUID string `json:"owner_uuid"`
ObjectUUID string `json:"object_uuid"`
ObjectOwnerUUID string `json:"object_owner_uuid"`
EventType string `json:"event_type"`
- EventAt *time.Time `json:"event"`
+ EventAt time.Time `json:"event"`
+ Summary string `json:"summary"`
Properties map[string]interface{} `json:"properties"`
- CreatedAt *time.Time `json:"created_at"`
+ CreatedAt time.Time `json:"created_at"`
+ ModifiedAt time.Time `json:"modified_at"`
}
// LogList is an arvados#logList resource.
"docker-image-repo-tag": true,
"filters": true,
"container_request": true,
+ "cwl_input": true,
+ "cwl_output": true,
}
}
"docker-image-repo-tag": true,
"filters": true,
"container_request": true,
+ "cwl_input": true,
+ "cwl_output": true,
},
StrictTags: false,
Tags: map[string]VocabularyTag{
as.appendCall(ctx, as.LinkDelete, options)
return arvados.Link{}, as.Error
}
+func (as *APIStub) LogCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogCreate, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogUpdate(ctx context.Context, options arvados.UpdateOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogUpdate, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogGet(ctx context.Context, options arvados.GetOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogGet, options)
+ return arvados.Log{}, as.Error
+}
+func (as *APIStub) LogList(ctx context.Context, options arvados.ListOptions) (arvados.LogList, error) {
+ as.appendCall(ctx, as.LogList, options)
+ return arvados.LogList{}, as.Error
+}
+func (as *APIStub) LogDelete(ctx context.Context, options arvados.DeleteOptions) (arvados.Log, error) {
+ as.appendCall(ctx, as.LogDelete, options)
+ return arvados.Log{}, as.Error
+}
func (as *APIStub) SpecimenCreate(ctx context.Context, options arvados.CreateOptions) (arvados.Specimen, error) {
as.appendCall(ctx, as.SpecimenCreate, options)
return arvados.Specimen{}, as.Error
Response map[string]interface{} `json:",omitempty"`
ResponseTime json.Number
ClockTime time.Time
+ Server string // "Server" header in http response
Metrics
respTime time.Duration
}
}
result.Health = "OK"
result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
+ result.Server = resp.Header.Get("Server")
return
}
err := ccmd.run(ctx, prog, args, stdin, stdout, stderr)
if err != nil {
if err != errSilent {
- fmt.Fprintln(stdout, err.Error())
+ fmt.Fprintln(stderr, err.Error())
}
return 1
}
loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
timeout := flags.Duration("timeout", defaultTimeout.Duration(), "Maximum time to wait for health responses")
+ quiet := flags.Bool("quiet", false, "Silent on success (suppress 'health check OK' message on stderr)")
outputYAML := flags.Bool("yaml", false, "Output full health report in YAML format (default mode shows errors as plain text, is silent on success)")
if ok, _ := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
// cmd.ParseFlags already reported the error
}
if resp.Health != "OK" {
for _, msg := range resp.Errors {
- fmt.Fprintln(stdout, msg)
+ fmt.Fprintln(stderr, msg)
}
fmt.Fprintln(stderr, "health check failed")
return errSilent
}
+ if !*quiet {
+ fmt.Fprintln(stderr, "health check OK")
+ }
return nil
}
exitcode := CheckCommand.RunCommand("check", []string{"-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
c.Check(exitcode, check.Equals, 0)
+ c.Check(stderr.String(), check.Equals, "health check OK\n")
+ c.Check(stdout.String(), check.Equals, "")
+
+ stdout.Reset()
+ stderr.Reset()
+ exitcode = CheckCommand.RunCommand("check", []string{"-quiet", "-config=" + tmpdir + "/config.yml"}, &bytes.Buffer{}, &stdout, &stderr)
+ c.Check(exitcode, check.Equals, 0)
c.Check(stderr.String(), check.Equals, "")
c.Check(stdout.String(), check.Equals, "")
"GitInternalDir": os.path.join(SERVICES_SRC_DIR, 'api', 'tmp', 'internal.git'),
},
"LocalKeepBlobBuffersPerVCPU": 0,
+ "Logging": {
+ "SweepInterval": 0, # disable, otherwise test cases can't acquire dblock
+ },
"SupportedDockerImageFormats": {"v1": {}},
"ShellAccess": {
"Admin": True,
# delete oid_login_perms for this user
#
- # note: these permission links are obsolete, they have no effect
- # on anything and they are not created for new users.
+ # note: these permission links are obsolete anyway: they have no
+ # effect on anything and they are not created for new users.
Link.where(tail_uuid: self.email,
link_class: 'permission',
name: 'can_login').destroy_all
- # delete repo_perms for this user
- Link.where(tail_uuid: self.uuid,
- link_class: 'permission',
- name: 'can_manage').destroy_all
-
- # delete vm_login_perms for this user
- Link.where(tail_uuid: self.uuid,
- link_class: 'permission',
- name: 'can_login').destroy_all
-
- # delete "All users" group read permissions for this user
+ # Delete all sharing permissions so (a) the user doesn't
+ # automatically regain access to anything if re-setup in future,
+ # (b) the user doesn't appear in "currently shared with" lists
+ # shown to other users.
+ #
+ # Notably this includes the can_read -> "all users" group
+ # permission.
Link.where(tail_uuid: self.uuid,
- head_uuid: all_users_group_uuid,
link_class: 'permission').destroy_all
# delete any signatures by this user
# from the logs table.
namespace :db do
- desc "Remove old container log entries from the logs table"
+ desc "deprecated / no-op"
task delete_old_container_logs: :environment do
- delete_sql = "DELETE FROM logs WHERE id in (SELECT logs.id FROM logs JOIN containers ON logs.object_uuid = containers.uuid WHERE event_type IN ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat') AND containers.log IS NOT NULL AND now() - containers.finished_at > interval '#{Rails.configuration.Containers.Logging.MaxAge.to_i} seconds')"
-
- ActiveRecord::Base.connection.execute(delete_sql)
+ Rails.logger.info "this db:delete_old_container_logs rake task is no longer used"
end
end
ApiClientAuthorization.create!(user: User.find_by_uuid(created['uuid']), api_client: ApiClient.all.first).api_token
end
+ # share project and collections with the new user
+ act_as_system_user do
+ Link.create!(tail_uuid: created['uuid'],
+ head_uuid: groups(:aproject).uuid,
+ link_class: 'permission',
+ name: 'can_manage')
+ Link.create!(tail_uuid: created['uuid'],
+ head_uuid: collections(:collection_owned_by_active).uuid,
+ link_class: 'permission',
+ name: 'can_read')
+ Link.create!(tail_uuid: created['uuid'],
+ head_uuid: collections(:collection_owned_by_active_with_file_stats).uuid,
+ link_class: 'permission',
+ name: 'can_write')
+ end
+
assert_equal 1, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'expected token not found'
post "/arvados/v1/users/#{created['uuid']}/unsetup", params: {}, headers: auth(:admin)
assert_not_nil created2['uuid'], 'expected uuid for the newly created user'
assert_equal created['uuid'], created2['uuid'], 'expected uuid not found'
assert_equal 0, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'token should have been deleted by user unsetup'
+ # check permissions are deleted
+ assert_empty Link.where(tail_uuid: created['uuid'])
verify_link_existence created['uuid'], created['email'], false, false, false, false, false
end
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'test_helper'
-require 'rake'
-
-Rake.application.rake_require "tasks/delete_old_container_logs"
-Rake::Task.define_task(:environment)
-
-class DeleteOldContainerLogsTaskTest < ActiveSupport::TestCase
- TASK_NAME = "db:delete_old_container_logs"
-
- def log_uuids(*fixture_names)
- fixture_names.map { |name| logs(name).uuid }
- end
-
- def run_with_expiry(clean_after)
- Rails.configuration.Containers.Logging.MaxAge = clean_after
- Rake::Task[TASK_NAME].reenable
- Rake.application.invoke_task TASK_NAME
- end
-
- def check_log_existence(test_method, fixture_uuids)
- uuids_now = Log.where("object_uuid LIKE :pattern AND event_type in ('stdout', 'stderr', 'arv-mount', 'crunch-run', 'crunchstat')", pattern: "%-dz642-%").map(&:uuid)
- fixture_uuids.each do |expect_uuid|
- send(test_method, uuids_now, expect_uuid)
- end
- end
-
- test "delete all finished logs" do
- uuids_to_keep = log_uuids(:stderr_for_running_container,
- :crunchstat_for_running_container)
- uuids_to_clean = log_uuids(:stderr_for_previous_container,
- :crunchstat_for_previous_container,
- :stderr_for_ancient_container,
- :crunchstat_for_ancient_container)
- run_with_expiry(1)
- check_log_existence(:assert_includes, uuids_to_keep)
- check_log_existence(:refute_includes, uuids_to_clean)
- end
-
- test "delete old finished logs" do
- uuids_to_keep = log_uuids(:stderr_for_running_container,
- :crunchstat_for_running_container,
- :stderr_for_previous_container,
- :crunchstat_for_previous_container)
- uuids_to_clean = log_uuids(:stderr_for_ancient_container,
- :crunchstat_for_ancient_container)
- run_with_expiry(360.days)
- check_log_existence(:assert_includes, uuids_to_keep)
- check_log_existence(:refute_includes, uuids_to_clean)
- end
-end
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/lib/ctrlctx"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
type Dispatcher struct {
*dispatch.Dispatcher
- logger logrus.FieldLogger
- cluster *arvados.Cluster
- sqCheck *SqueueChecker
- slurm Slurm
+ logger logrus.FieldLogger
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
+ dbConnector ctrlctx.DBConnector
done chan struct{}
err error
disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
disp.Client.AuthToken = disp.cluster.SystemRootToken
disp.Client.Insecure = disp.cluster.TLS.Insecure
+ disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.cluster.PostgreSQL}
if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
}
func (disp *Dispatcher) run() error {
+ dblock.Dispatch.Lock(context.Background(), disp.dbConnector.GetDB)
+ defer dblock.Dispatch.Unlock()
defer disp.sqCheck.Stop()
if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
version = "dev"
)
+type logger interface {
+ Printf(string, ...interface{})
+}
+
func main() {
reporter := crunchstat.Reporter{
Logger: log.New(os.Stderr, "crunchstat: ", 0),
reporter.Logger.Printf("crunchstat %s started", version)
if reporter.CgroupRoot == "" {
- reporter.Logger.Fatal("error: must provide -cgroup-root")
+ reporter.Logger.Printf("error: must provide -cgroup-root")
+ os.Exit(2)
} else if signalOnDeadPPID < 0 {
- reporter.Logger.Fatalf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+ reporter.Logger.Printf("-signal-on-dead-ppid=%d is invalid (use a positive signal number, or 0 to disable)", signalOnDeadPPID)
+ os.Exit(2)
}
reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
if status, ok := err.Sys().(syscall.WaitStatus); ok {
os.Exit(status.ExitStatus())
} else {
- reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
+ reporter.Logger.Printf("ExitError without WaitStatus: %v", err)
+ os.Exit(1)
}
} else if err != nil {
- reporter.Logger.Fatalln("error in cmd.Wait:", err)
+ reporter.Logger.Printf("error running command: %v", err)
+ os.Exit(1)
}
}
-func runCommand(argv []string, logger *log.Logger) error {
+func runCommand(argv []string, logger logger) error {
cmd := exec.Command(argv[0], argv[1:]...)
- logger.Println("Running", argv)
+ logger.Printf("Running %v", argv)
// Child process will use our stdin and stdout pipes
// (we close our copies below)
if cmd.Process != nil {
cmd.Process.Signal(catch)
}
- logger.Println("notice: caught signal:", catch)
+ logger.Printf("notice: caught signal: %v", catch)
}(sigChan)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)
// Funnel stderr through our channel
stderrPipe, err := cmd.StderrPipe()
if err != nil {
- logger.Fatalln("error in StderrPipe:", err)
+ logger.Printf("error in StderrPipe: %v", err)
+ return err
}
// Run subprocess
if err := cmd.Start(); err != nil {
- logger.Fatalln("error in cmd.Start:", err)
+ logger.Printf("error in cmd.Start: %v", err)
+ return err
}
// Close stdin/stdout in this (parent) process
os.Stdin.Close()
os.Stdout.Close()
- copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+ err = copyPipeToChildLog(stderrPipe, log.New(os.Stderr, "", 0))
+ if err != nil {
+ cmd.Process.Kill()
+ return err
+ }
return cmd.Wait()
}
-func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger *log.Logger) {
+func sendSignalOnDeadPPID(intvl time.Duration, signum, ppidOrig int, cmd *exec.Cmd, logger logger) {
ticker := time.NewTicker(intvl)
for range ticker.C {
ppid := os.Getppid()
}
}
-func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+func copyPipeToChildLog(in io.ReadCloser, logger logger) error {
reader := bufio.NewReaderSize(in, MaxLogLine)
var prefix string
for {
if err == io.EOF {
break
} else if err != nil {
- logger.Fatal("error reading child stderr:", err)
+ return fmt.Errorf("error reading child stderr: %w", err)
}
var suffix string
if isPrefix {
suffix = "[...]"
}
- logger.Print(prefix, string(line), suffix)
+ logger.Printf("%s%s%s", prefix, string(line), suffix)
// Set up prefix for following line
if isPrefix {
prefix = "[...]"
prefix = ""
}
}
- in.Close()
+ return in.Close()
}
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
// subsequent balance operation.
//
// Run should only be called once on a given Balancer object.
-//
-// Typical usage:
-//
-// runOptions, err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+func (bal *Balancer) Run(ctx context.Context, client *arvados.Client, cluster *arvados.Cluster, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
nextRunOptions = runOptions
+ ctxlog.FromContext(ctx).Info("acquiring active lock")
+ if !dblock.KeepBalanceActive.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return bal.DB, nil }) {
+ // context canceled
+ return
+ }
+ defer dblock.KeepBalanceActive.Unlock()
+
defer bal.time("sweep", "wall clock time to run one full sweep")()
- ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
+ ctx, cancel := context.WithDeadline(ctx, time.Now().Add(cluster.Collections.BalanceTimeout.Duration()))
defer cancel()
var lbFile *os.File
import (
"bytes"
+ "context"
"encoding/json"
"fmt"
"io"
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err = srv.runOnce()
+ _, err = srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err := srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- _, err := srv.runOnce()
+ _, err := srv.runOnce(context.Background())
c.Check(err, check.ErrorMatches, "cannot continue with config errors.*")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
c.Assert(err, check.IsNil)
- _, err = srv.runOnce()
+ _, err = srv.runOnce(context.Background())
c.Check(err, check.IsNil)
lost, err := ioutil.ReadFile(lostf.Name())
c.Assert(err, check.IsNil)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- bal, err := srv.runOnce()
+ bal, err := srv.runOnce(context.Background())
c.Check(err, check.IsNil)
for _, req := range collReqs.reqs {
c.Check(req.Form.Get("include_trash"), check.Equals, "true")
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
srv := s.newServer(&opts)
- bal, err := srv.runOnce()
+ bal, err := srv.runOnce(context.Background())
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- stop := make(chan interface{})
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
s.config.Collections.BalancePeriod = arvados.Duration(time.Millisecond)
srv := s.newServer(&opts)
done := make(chan bool)
go func() {
- srv.runForever(stop)
+ srv.runForever(ctx)
close(done)
}()
for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
time.Sleep(time.Millisecond)
}
- stop <- true
+ cancel()
<-done
c.Check(pullReqs.Count() >= 16, check.Equals, true)
c.Check(trashReqs.Count(), check.Equals, pullReqs.Count()+4)
import (
"bytes"
+ "context"
"io"
"os"
"strings"
Logger: logger,
Metrics: newMetrics(prometheus.NewRegistry()),
}
- nextOpts, err := bal.Run(s.client, s.config, opts)
+ nextOpts, err := bal.Run(context.Background(), s.client, s.config, opts)
c.Check(err, check.IsNil)
c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
c.Check(nextOpts.CommitPulls, check.Equals, true)
Routes: health.Routes{"ping": srv.CheckHealth},
}
- go srv.run()
+ go srv.run(ctx)
return srv
}).RunCommand(prog, args, stdin, stdout, stderr)
}
package keepbalance
import (
+ "context"
"net/http"
"os"
"os/signal"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/controller/dblock"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
return nil
}
-func (srv *Server) run() {
+func (srv *Server) run(ctx context.Context) {
var err error
if srv.RunOptions.Once {
- _, err = srv.runOnce()
+ _, err = srv.runOnce(ctx)
} else {
- err = srv.runForever(nil)
+ err = srv.runForever(ctx)
}
if err != nil {
srv.Logger.Error(err)
}
}
-func (srv *Server) runOnce() (*Balancer, error) {
+func (srv *Server) runOnce(ctx context.Context) (*Balancer, error) {
bal := &Balancer{
DB: srv.DB,
Logger: srv.Logger,
LostBlocksFile: srv.Cluster.Collections.BlobMissingReport,
}
var err error
- srv.RunOptions, err = bal.Run(srv.ArvClient, srv.Cluster, srv.RunOptions)
+ srv.RunOptions, err = bal.Run(ctx, srv.ArvClient, srv.Cluster, srv.RunOptions)
return bal, err
}
-// RunForever runs forever, or (for testing purposes) until the given
-// stop channel is ready to receive.
-func (srv *Server) runForever(stop <-chan interface{}) error {
+// RunForever runs forever, or until ctx is cancelled.
+func (srv *Server) runForever(ctx context.Context) error {
logger := srv.Logger
ticker := time.NewTicker(time.Duration(srv.Cluster.Collections.BalancePeriod))
sigUSR1 := make(chan os.Signal)
signal.Notify(sigUSR1, syscall.SIGUSR1)
+ logger.Info("acquiring service lock")
+ dblock.KeepBalanceService.Lock(ctx, func(context.Context) (*sqlx.DB, error) { return srv.DB, nil })
+ defer dblock.KeepBalanceService.Unlock()
+
logger.Printf("starting up: will scan every %v and on SIGUSR1", srv.Cluster.Collections.BalancePeriod)
for {
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- _, err := srv.runOnce()
+ if !dblock.KeepBalanceService.Check() {
+ // context canceled
+ return nil
+ }
+ _, err := srv.runOnce(ctx)
if err != nil {
logger.Print("run failed: ", err)
} else {
}
select {
- case <-stop:
+ case <-ctx.Done():
signal.Stop(sigUSR1)
return nil
case <-ticker.C:
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_set_header: 'Upgrade $http_upgrade'
+ - proxy_set_header: 'Connection "upgrade"'
- proxy_max_temp_file_size: 0
- proxy_request_buffering: 'off'
- proxy_buffering: 'off'
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_set_header: 'Upgrade $http_upgrade'
+ - proxy_set_header: 'Connection "upgrade"'
- proxy_max_temp_file_size: 0
- proxy_request_buffering: 'off'
- proxy_buffering: 'off'
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_set_header: 'Upgrade $http_upgrade'
+ - proxy_set_header: 'Connection "upgrade"'
- proxy_max_temp_file_size: 0
- proxy_request_buffering: 'off'
- proxy_buffering: 'off'