Veritas Genetics, Inc. <*@veritasgenetics.com>
Curii Corporation, Inc. <*@curii.com>
Dante Tsang <dante@dantetsang.com>
-Codex Genetics Ltd <info@codexgenetics.com>
\ No newline at end of file
+Codex Genetics Ltd <info@codexgenetics.com>
+Bruno P. Kinoshita <brunodepaulak@yahoo.com.br>
Those interested in contributing should begin by joining the [Arvados community
channel](https://gitter.im/arvados/community) and telling us about your interest.
-Contributers should also create an account at https://dev.arvados.org
+Contributors should also create an account at https://dev.arvados.org
to be able to create and comment on bug tracker issues. The
Arvados public bug tracker is located at
https://dev.arvados.org/projects/arvados/issues .
-Contributers may also be interested in the [development road map](https://dev.arvados.org/issues/gantt?utf8=%E2%9C%93&set_filter=1&gantt=1&f%5B%5D=project_id&op%5Bproject_id%5D=%3D&v%5Bproject_id%5D%5B%5D=49&f%5B%5D=&zoom=1).
+Contributors may also be interested in the [development road map](https://dev.arvados.org/issues/gantt?utf8=%E2%9C%93&set_filter=1&gantt=1&f%5B%5D=project_id&op%5Bproject_id%5D=%3D&v%5Bproject_id%5D%5B%5D=49&f%5B%5D=&zoom=1).
# Development
Git repositories for primary development are located at
https://git.arvados.org/ and can also be browsed at
https://dev.arvados.org/projects/arvados/repository . Every push to
-the master branch is also mirrored to Github at
+the main branch is also mirrored to Github at
https://github.com/arvados/arvados .
Visit [Hacking Arvados](https://dev.arvados.org/projects/arvados/wiki/Hacking) for
2. Clone your fork, make your changes, commit to your fork.
3. Every commit message must have a DCO sign-off and every file must have a SPDX license (see below).
4. Add yourself to the [AUTHORS](AUTHORS) file
-5. When your fork is ready, through Github, Create a Pull Request against `arvados:master`
+5. When your fork is ready, through Github, Create a Pull Request against `arvados:main`
6. Notify the core team about your pull request through the [Arvados development
channel](https://gitter.im/arvados/development) or by other means.
7. A member of the core team will review the pull request. They may have questions or comments, or request changes.
8. When the contribution is ready, a member of the core team will
-merge the pull request into the master branch, which will
+merge the pull request into the main branch, which will
automatically resolve the pull request.
The Arvados project does not require a contributor agreement in advance, but does require each commit message include a [Developer Certificate of Origin](https://dev.arvados.org/projects/arvados/wiki/Developer_Certificate_Of_Origin). Please ensure *every git commit message* includes `Arvados-DCO-1.1-Signed-off-by`. If you have already made commits without it, fix them with `git commit --amend` or `git rebase`.
Continuous integration is hosted at https://ci.arvados.org/
-Currently, external contributers cannot trigger builds. We are investigating integration with Github pull requests for the future.
+Currently, external contributors cannot trigger builds. We are investigating integration with Github pull requests for the future.
[![Build Status](https://ci.arvados.org/buildStatus/icon?job=run-tests)](https://ci.arvados.org/job/run-tests/)
"strings"
"syscall"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/controller/rpc"
"git.arvados.org/arvados.git/sdk/go/arvados"
)
func (shellCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
f := flag.NewFlagSet(prog, flag.ContinueOnError)
- f.SetOutput(stderr)
- f.Usage = func() {
- _, prog := filepath.Split(prog)
- fmt.Fprint(stderr, prog+`: open an interactive shell on a running container.
-
-Usage: `+prog+` [options] [username@]container-uuid [ssh-options] [remote-command [args...]]
-
-Options:
-`)
- f.PrintDefaults()
- }
detachKeys := f.String("detach-keys", "ctrl-],ctrl-]", "set detach key sequence, as in docker-attach(1)")
- err := f.Parse(args)
- if err != nil {
- fmt.Fprintln(stderr, err)
- return 2
- }
-
- if f.NArg() < 1 {
- f.Usage()
+ if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid [ssh-options] [remote-command [args...]]", stderr); !ok {
+ return code
+ } else if f.NArg() < 1 {
+ fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
return 2
}
target := f.Args()[0]
}
probeOnly := f.Bool("probe-only", false, "do not transfer IO, just setup tunnel, print target UUID, and exit")
detachKeys := f.String("detach-keys", "", "set detach key sequence, as in docker-attach(1)")
- if err := f.Parse(args); err != nil {
- fmt.Fprintln(stderr, err)
- return 2
+ if ok, code := cmd.ParseFlags(f, prog, args, "[username@]container-uuid", stderr); !ok {
+ return code
} else if f.NArg() != 1 {
- f.Usage()
+ fmt.Fprintf(stderr, "missing required argument: [username@]container-uuid\n")
return 2
}
targetUUID := f.Args()[0]
func main() {
if len(os.Args) < 2 || strings.HasPrefix(os.Args[1], "-") {
- parseFlags([]string{"-help"})
+ parseFlags(os.Args[0], []string{"-help"}, os.Stderr)
os.Exit(2)
}
os.Exit(handler.RunCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
func (cf cmdFunc) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
logger := ctxlog.New(stderr, "text", "info")
ctx := ctxlog.Context(context.Background(), logger)
- opts, err := parseFlags(args)
- if err != nil {
- logger.WithError(err).Error("error parsing command line flags")
- return 1
+ opts, ok, code := parseFlags(prog, args, stderr)
+ if !ok {
+ return code
}
- err = cf(ctx, opts, stdin, stdout, stderr)
+ err := cf(ctx, opts, stdin, stdout, stderr)
if err != nil {
logger.WithError(err).Error("failed")
return 1
Vendor string
}
-func parseFlags(args []string) (opts, error) {
+func parseFlags(prog string, args []string, stderr io.Writer) (_ opts, ok bool, exitCode int) {
opts := opts{
SourceDir: ".",
TargetOS: "debian:10",
`)
flags.PrintDefaults()
}
- err := flags.Parse(args)
- if err != nil {
- return opts, err
- }
- if len(flags.Args()) > 0 {
- return opts, fmt.Errorf("unrecognized command line arguments: %v", flags.Args())
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return opts, false, code
}
if opts.SourceDir == "" {
d, err := os.Getwd()
if err != nil {
- return opts, fmt.Errorf("Getwd: %w", err)
+ fmt.Fprintf(stderr, "error getting current working directory: %s\n", err)
+ return opts, false, 1
}
opts.SourceDir = d
}
opts.PackageDir = filepath.Clean(opts.PackageDir)
- opts.SourceDir, err = filepath.Abs(opts.SourceDir)
+ abs, err := filepath.Abs(opts.SourceDir)
if err != nil {
- return opts, err
+ fmt.Fprintf(stderr, "error resolving source dir %q: %s\n", opts.SourceDir, err)
+ return opts, false, 1
}
- return opts, nil
+ opts.SourceDir = abs
+ return opts, true, 0
}
{% endcomment %}
<notextile>
-<pre><code># <span class="userinput">apt-get --no-install-recommends install curl gnupg2</span>
-# <span class="userinput">curl -s https://apt.arvados.org/pubkey.gpg -o /etc/apt/trusted.gpg.d/arvados.asc</span>
+<pre><code># <span class="userinput">apt-get --no-install-recommends install curl gnupg2 ca-certificates</span>
+# <span class="userinput">curl https://apt.arvados.org/pubkey.gpg -o /etc/apt/trusted.gpg.d/arvados.asc</span>
</code></pre>
</notextile>
If there's a need to prevent a non-admin user from modifying a specific property, even by its owner, the @Protected@ attribute can be set to @true@, like so:
+<pre>
+Collections:
+ ManagedProperties:
+ sample_id: {Protected: true}
+</pre>
+
+This configuration won't assign a @sample_id@ property on collection creation, but if the user adds it to any collection, its value is protected from that point on.
+
+Another use case would be to protect properties that were automatically assigned by the system:
+
<pre>
Collections:
ManagedProperties:
responsible_person_uuid: {Function: original_owner, Protected: true}
</pre>
-This property can be applied to any of the defined managed properties. If missing, it's assumed as being @false@ by default.
+If missing, the @Protected@ attribute it’s assumed as being @false@ by default.
h3. Supporting example scripts
"previous: Upgrading from 2.3.0":#v2_3_0
+h3. Previously trashed role groups will be deleted
+
+Due to a bug in previous versions, the @DELETE@ operation on a role group caused the group to be flagged as trash in the database, but continue to grant permissions regardless. After upgrading, any role groups that had been trashed this way will be deleted. This might surprise some users if they were relying on permissions that were still in effect due to this bug. Future @DELETE@ operations on a role group will immediately delete the group and revoke the associated permissions.
+
h3. Users are visible to other users by default
When a new user is set up (either via @AutoSetupNewUsers@ config or via Workbench admin interface) the user immediately becomes visible to other users. To revert to the previous behavior, where the administrator must add two users to the same group using the Workbench admin interface in order for the users to see each other, change the new @Users.ActivatedUsersAreVisibleToOthers@ config to @false@.
One is by "configuring (system-wide) the collection's idle time":{{site.baseurl}}/admin/collection-versioning.html. This idle time is checked against the @modified_at@ attribute so that the version is saved when one or more of the previously enumerated attributes get updated and the @modified_at@ is at least at the configured idle time in the past. This way, a frequently updated collection won't create lots of version records that may not be useful.
-The other way to trigger a version save, is by setting @preserve_version@ to @true@ on the current version collection record: this ensures that the current state will be preserved as a version the next time it gets updated.
+The other way to trigger a version save, is by setting @preserve_version@ to @true@ on the current version collection record: this ensures that the current state will be preserved as a version the next time it gets updated. This includes either creating a new collection or updating a preexisting one. In the case of using @preserve_version = true@ on a collection's create call, the new record state will be preserved as a snapshot on the next update.
h3. Collection's past versions behavior & limitations
}
var errNeedConfigReload = errors.New("config changed, restart needed")
+var errParseFlags = errors.New("error parsing command line arguments")
type bootCommand struct{}
err := bcmd.run(ctx, prog, args, stdin, stdout, stderr)
if err == errNeedConfigReload {
continue
+ } else if err == errParseFlags {
+ return 2
} else if err != nil {
logger.WithError(err).Info("exiting")
return 1
}
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
- flags.SetOutput(stderr)
loader := config.NewLoader(stdin, super.logger)
loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
flags.BoolVar(&super.OwnTemporaryDatabase, "own-temporary-database", false, "bring up a postgres server and create a temporary database")
timeout := flags.Duration("timeout", 0, "maximum time to wait for cluster to be ready")
shutdown := flags.Bool("shutdown", false, "shut down when the cluster becomes ready")
- err := flags.Parse(args)
- if err == flag.ErrHelp {
- return nil
- } else if err != nil {
- return err
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ if code == 0 {
+ return nil
+ } else {
+ return errParseFlags
+ }
} else if *versionFlag {
cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
return nil
"os"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/sdk/go/arvados"
destroyExisting := flags.Bool("destroy-existing", false, "Destroy any existing instances tagged with our InstanceSetID, instead of erroring out")
shellCommand := flags.String("command", "", "Run an interactive shell command on the test instance when it boots")
pauseBeforeDestroy := flags.Bool("pause-before-destroy", false, "Prompt and wait before destroying the test instance")
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- return 0
- } else if err != nil {
- return 2
- }
-
- if len(flags.Args()) != 0 {
- flags.Usage()
- return 2
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return code
}
logger := ctxlog.New(stderr, "text", "info")
defer func() {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package cmd
+
+import (
+ "flag"
+ "fmt"
+ "io"
+)
+
+// ParseFlags calls f.Parse(args) and prints appropriate error/help
+// messages to stderr.
+//
+// The positional argument is "" if no positional arguments are
+// accepted, otherwise a string to print with the usage message,
+// "Usage: {prog} [options] {positional}".
+//
+// The first return value, ok, is true if the program should continue
+// running normally, or false if it should exit now.
+//
+// If ok is false, the second return value is an appropriate exit
+// code: 0 if "-help" was given, 2 if there was a usage error.
+func ParseFlags(f FlagSet, prog string, args []string, positional string, stderr io.Writer) (ok bool, exitCode int) {
+ f.Init(prog, flag.ContinueOnError)
+ f.SetOutput(io.Discard)
+ err := f.Parse(args)
+ switch err {
+ case nil:
+ if f.NArg() > 0 && positional == "" {
+ fmt.Fprintf(stderr, "unrecognized command line arguments: %v (try -help)\n", f.Args())
+ return false, 2
+ }
+ return true, 0
+ case flag.ErrHelp:
+ if f, ok := f.(*flag.FlagSet); ok && f.Usage != nil {
+ f.SetOutput(stderr)
+ f.Usage()
+ } else {
+ fmt.Fprintf(stderr, "Usage: %s [options] %s\n", prog, positional)
+ f.SetOutput(stderr)
+ f.PrintDefaults()
+ }
+ return false, 0
+ default:
+ fmt.Fprintf(stderr, "error parsing command line arguments: %s (try -help)\n", err)
+ return false, 2
+ }
+}
"os"
"os/exec"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/ghodss/yaml"
"github.com/sirupsen/logrus"
}
flags := flag.NewFlagSet("", flag.ContinueOnError)
- flags.SetOutput(stderr)
loader.SetupFlags(flags)
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- return 0
- } else if err != nil {
- return 2
- }
-
- if len(flags.Args()) != 0 {
- flags.Usage()
- return 2
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return code
}
-
cfg, err := loader.Load()
if err != nil {
return 1
Logger: logger,
}
- flags := flag.NewFlagSet("", flag.ContinueOnError)
- flags.SetOutput(stderr)
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
loader.SetupFlags(flags)
strict := flags.Bool("strict", true, "Strict validation of configuration file (warnings result in non-zero exit code)")
-
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- return 0
- } else if err != nil {
- return 2
- }
-
- if len(flags.Args()) != 0 {
- flags.Usage()
- return 2
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return code
}
// Load the config twice -- once without loading deprecated
var stderr bytes.Buffer
code := DumpCommand.RunCommand("arvados config-dump", []string{"-badarg"}, bytes.NewBuffer(nil), bytes.NewBuffer(nil), &stderr)
c.Check(code, check.Equals, 2)
- c.Check(stderr.String(), check.Matches, `(?ms)flag provided but not defined: -badarg\nUsage:\n.*`)
+ c.Check(stderr.String(), check.Equals, "error parsing command line arguments: flag provided but not defined: -badarg (try -help)\n")
}
func (s *CommandSuite) TestDump_EmptyInput(c *check.C) {
# in /tmp on the compute node each time an Arvados container
# runs. Ensure you have something in place to delete old files
# from /tmp, or adjust the "-o" and "-e" arguments accordingly.
- BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+ BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
# Use sudo to switch to this user account when submitting LSF
# jobs.
ldr := testLoader(c, "Clusters: {zzzzz: {}}", nil)
ldr.SetupFlags(flags)
args := ldr.MungeLegacyConfigArgs(ldr.Logger, []string{"-config", tmpfile.Name()}, mungeFlag)
- flags.Parse(args)
+ err = flags.Parse(args)
+ c.Assert(err, check.IsNil)
+ c.Assert(flags.NArg(), check.Equals, 0)
cfg, err := ldr.Load()
if err != nil {
return nil, err
"Collections.BlobTrashCheckInterval": false,
"Collections.BlobTrashConcurrency": false,
"Collections.BlobTrashLifetime": false,
- "Collections.CollectionVersioning": false,
+ "Collections.CollectionVersioning": true,
"Collections.DefaultReplication": true,
"Collections.DefaultTrashLifetime": true,
"Collections.ForwardSlashNameSubstitution": true,
# in /tmp on the compute node each time an Arvados container
# runs. Ensure you have something in place to delete old files
# from /tmp, or adjust the "-o" and "-e" arguments accordingly.
- BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+ BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
# Use sudo to switch to this user account when submitting LSF
# jobs.
cluster.Login.OpenIDConnect.AcceptAccessToken = true
cluster.Login.OpenIDConnect.AcceptAccessTokenScope = ""
- s.testHandler = &Handler{Cluster: cluster}
+ s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
s.testServer = newServerFromIntegrationTestEnv(c)
s.testServer.Server.BaseContext = func(net.Listener) context.Context {
return ctxlog.Context(context.Background(), s.log)
// Command starts a controller service. See cmd/arvados-server/cmd.go
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(_ context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
- return &Handler{Cluster: cluster}
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+ return &Handler{Cluster: cluster, BackgroundContext: ctx}
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dblock
+
+import (
+ "context"
+ "database/sql"
+ "sync"
+ "time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/jmoiron/sqlx"
+)
+
+var (
+ TrashSweep = &DBLocker{key: 10001}
+ retryDelay = 5 * time.Second
+)
+
+// DBLocker uses pg_advisory_lock to maintain a cluster-wide lock for
+// a long-running task like "do X every N seconds".
+type DBLocker struct {
+ key int
+ mtx sync.Mutex
+ ctx context.Context
+ getdb func(context.Context) (*sqlx.DB, error)
+ conn *sql.Conn // != nil if advisory lock has been acquired
+}
+
+// Lock acquires the advisory lock, waiting/reconnecting if needed.
+func (dbl *DBLocker) Lock(ctx context.Context, getdb func(context.Context) (*sqlx.DB, error)) {
+ logger := ctxlog.FromContext(ctx)
+ for ; ; time.Sleep(retryDelay) {
+ dbl.mtx.Lock()
+ if dbl.conn != nil {
+ // Already locked by another caller in this
+ // process. Wait for them to release.
+ dbl.mtx.Unlock()
+ continue
+ }
+ db, err := getdb(ctx)
+ if err != nil {
+ logger.WithError(err).Infof("error getting database pool")
+ dbl.mtx.Unlock()
+ continue
+ }
+ conn, err := db.Conn(ctx)
+ if err != nil {
+ logger.WithError(err).Info("error getting database connection")
+ dbl.mtx.Unlock()
+ continue
+ }
+ _, err = conn.ExecContext(ctx, `SELECT pg_advisory_lock($1)`, dbl.key)
+ if err != nil {
+ logger.WithError(err).Infof("error getting pg_advisory_lock %d", dbl.key)
+ conn.Close()
+ dbl.mtx.Unlock()
+ continue
+ }
+ logger.Debugf("acquired pg_advisory_lock %d", dbl.key)
+ dbl.ctx, dbl.getdb, dbl.conn = ctx, getdb, conn
+ dbl.mtx.Unlock()
+ return
+ }
+}
+
+// Check confirms that the lock is still active (i.e., the session is
+// still alive), and re-acquires if needed. Panics if Lock is not
+// acquired first.
+func (dbl *DBLocker) Check() {
+ dbl.mtx.Lock()
+ err := dbl.conn.PingContext(dbl.ctx)
+ if err == nil {
+ ctxlog.FromContext(dbl.ctx).Debugf("pg_advisory_lock %d connection still alive", dbl.key)
+ dbl.mtx.Unlock()
+ return
+ }
+ ctxlog.FromContext(dbl.ctx).WithError(err).Info("database connection ping failed")
+ dbl.conn.Close()
+ dbl.conn = nil
+ ctx, getdb := dbl.ctx, dbl.getdb
+ dbl.mtx.Unlock()
+ dbl.Lock(ctx, getdb)
+}
+
+func (dbl *DBLocker) Unlock() {
+ dbl.mtx.Lock()
+ defer dbl.mtx.Unlock()
+ if dbl.conn != nil {
+ _, err := dbl.conn.ExecContext(context.Background(), `SELECT pg_advisory_unlock($1)`, dbl.key)
+ if err != nil {
+ ctxlog.FromContext(dbl.ctx).WithError(err).Infof("error releasing pg_advisory_lock %d", dbl.key)
+ } else {
+ ctxlog.FromContext(dbl.ctx).Debugf("released pg_advisory_lock %d", dbl.key)
+ }
+ dbl.conn.Close()
+ dbl.conn = nil
+ }
+}
return conn.chooseBackend(options.UUID).SpecimenDelete(ctx, options)
}
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+ return conn.local.SysTrashSweep(ctx, options)
+}
+
var userAttrsCachedFromLoginCluster = map[string]bool{
"created_at": true,
"email": true,
cluster.Collections.BlobSigningTTL = arvados.Duration(time.Hour * 24 * 14)
arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
- s.testHandler = &Handler{Cluster: cluster}
+ s.testHandler = &Handler{Cluster: cluster, BackgroundContext: ctxlog.Context(context.Background(), s.log)}
s.testServer = newServerFromIntegrationTestEnv(c)
s.testServer.Server.BaseContext = func(net.Listener) context.Context {
return ctxlog.Context(context.Background(), s.log)
)
type Handler struct {
- Cluster *arvados.Cluster
+ Cluster *arvados.Cluster
+ BackgroundContext context.Context
setupOnce sync.Once
+ federation *federation.Conn
handlerStack http.Handler
proxy *proxy
secureClient *http.Client
healthFuncs := make(map[string]health.Func)
oidcAuthorizer := localdb.OIDCAccessTokenAuthorizer(h.Cluster, h.db)
- rtr := router.New(federation.New(h.Cluster, &healthFuncs), router.Config{
+ h.federation = federation.New(h.Cluster, &healthFuncs)
+ rtr := router.New(h.federation, router.Config{
MaxRequestSize: h.Cluster.API.MaxRequestSize,
WrapCalls: api.ComposeWrappers(ctrlctx.WrapCallsInTransactions(h.db), oidcAuthorizer.WrapCalls),
})
h.proxy = &proxy{
Name: "arvados-controller",
}
+
+ go h.trashSweepWorker()
}
var errDBConnection = errors.New("database connection error")
type HandlerSuite struct {
cluster *arvados.Cluster
- handler http.Handler
+ handler *Handler
ctx context.Context
cancel context.CancelFunc
}
s.cluster.TLS.Insecure = true
arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
- s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry())
+ s.handler = newHandler(s.ctx, s.cluster, "", prometheus.NewRegistry()).(*Handler)
}
func (s *HandlerSuite) TearDownTest(c *check.C) {
func (s *HandlerSuite) TestValidateV1APIToken(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
- user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveToken)
+ user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveToken)
c.Assert(err, check.IsNil)
c.Check(ok, check.Equals, true)
c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
func (s *HandlerSuite) TestValidateV2APIToken(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
- user, ok, err := s.handler.(*Handler).validateAPItoken(req, arvadostest.ActiveTokenV2)
+ user, ok, err := s.handler.validateAPItoken(req, arvadostest.ActiveTokenV2)
c.Assert(err, check.IsNil)
c.Check(ok, check.Equals, true)
c.Check(user.Authorization.UUID, check.Equals, arvadostest.ActiveTokenUUID)
func (s *HandlerSuite) TestCreateAPIToken(c *check.C) {
req := httptest.NewRequest("GET", "/arvados/v1/users/current", nil)
- auth, err := s.handler.(*Handler).createAPItoken(req, arvadostest.ActiveUserUUID, nil)
+ auth, err := s.handler.createAPItoken(req, arvadostest.ActiveUserUUID, nil)
c.Assert(err, check.IsNil)
c.Check(auth.Scopes, check.DeepEquals, []string{"all"})
- user, ok, err := s.handler.(*Handler).validateAPItoken(req, auth.TokenV2())
+ user, ok, err := s.handler.validateAPItoken(req, auth.TokenV2())
c.Assert(err, check.IsNil)
c.Check(ok, check.Equals, true)
c.Check(user.Authorization.UUID, check.Equals, auth.UUID)
c.Check(jresp.Errors[0], check.Matches, `.*//railsapi\.internal/arvados/v1/collections/.*: 404 Not Found.*`)
c.Check(jresp.Errors[0], check.Not(check.Matches), `(?ms).*127.0.0.1.*`)
}
+
+func (s *HandlerSuite) TestTrashSweep(c *check.C) {
+ s.cluster.SystemRootToken = arvadostest.SystemRootToken
+ s.cluster.Collections.TrashSweepInterval = arvados.Duration(time.Second / 10)
+ s.handler.CheckHealth()
+ ctx := auth.NewContext(s.ctx, &auth.Credentials{Tokens: []string{arvadostest.ActiveTokenV2}})
+ coll, err := s.handler.federation.CollectionCreate(ctx, arvados.CreateOptions{Attrs: map[string]interface{}{"name": "test trash sweep"}, EnsureUniqueName: true})
+ c.Assert(err, check.IsNil)
+ defer s.handler.federation.CollectionDelete(ctx, arvados.DeleteOptions{UUID: coll.UUID})
+ db, err := s.handler.db(s.ctx)
+ c.Assert(err, check.IsNil)
+ _, err = db.ExecContext(s.ctx, `update collections set trash_at = $1, delete_at = $2 where uuid = $3`, time.Now().UTC().Add(time.Second/10), time.Now().UTC().Add(time.Hour), coll.UUID)
+ c.Assert(err, check.IsNil)
+ deadline := time.Now().Add(5 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ c.Log("timed out")
+ c.FailNow()
+ }
+ updated, err := s.handler.federation.CollectionGet(ctx, arvados.GetOptions{UUID: coll.UUID, IncludeTrash: true})
+ c.Assert(err, check.IsNil)
+ if updated.IsTrashed {
+ break
+ }
+ time.Sleep(time.Second / 10)
+ }
+}
func (rtr *router) responseOptions(opts interface{}) (responseOptions, error) {
var rOpts responseOptions
switch opts := opts.(type) {
+ case *arvados.CreateOptions:
+ rOpts.Select = opts.Select
+ case *arvados.UpdateOptions:
+ rOpts.Select = opts.Select
case *arvados.GetOptions:
rOpts.Select = opts.Select
case *arvados.ListOptions:
func (s *RouterIntegrationSuite) TestSelectParam(c *check.C) {
uuid := arvadostest.QueuedContainerUUID
token := arvadostest.ActiveTokenV2
+ // GET
for _, sel := range [][]string{
{"uuid", "command"},
{"uuid", "command", "uuid"},
_, hasMounts := resp["mounts"]
c.Check(hasMounts, check.Equals, false)
}
+ // POST & PUT
+ uuid = arvadostest.FooCollection
+ j, err := json.Marshal([]string{"uuid", "description"})
+ c.Assert(err, check.IsNil)
+ for _, method := range []string{"PUT", "POST"} {
+ desc := "Today is " + time.Now().String()
+ reqBody := "{\"description\":\"" + desc + "\"}"
+ var resp map[string]interface{}
+ var rr *httptest.ResponseRecorder
+ if method == "PUT" {
+ _, rr, resp = doRequest(c, s.rtr, token, method, "/arvados/v1/collections/"+uuid+"?select="+string(j), nil, bytes.NewReader([]byte(reqBody)))
+ } else {
+ _, rr, resp = doRequest(c, s.rtr, token, method, "/arvados/v1/collections?select="+string(j), nil, bytes.NewReader([]byte(reqBody)))
+ }
+ c.Check(rr.Code, check.Equals, http.StatusOK)
+ c.Check(resp["kind"], check.Equals, "arvados#collection")
+ c.Check(resp["uuid"], check.HasLen, 27)
+ c.Check(resp["description"], check.Equals, desc)
+ c.Check(resp["manifest_text"], check.IsNil)
+ }
}
func (s *RouterIntegrationSuite) TestHEAD(c *check.C) {
return resp, err
}
+func (conn *Conn) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+ ep := arvados.EndpointSysTrashSweep
+ var resp struct{}
+ err := conn.requestAndDecode(ctx, &resp, ep, nil, options)
+ return resp, err
+}
+
func (conn *Conn) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
ep := arvados.EndpointUserCreate
var resp arvados.User
// provided by the integration-testing environment.
func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
log := ctxlog.TestLogger(c)
-
- handler := &Handler{Cluster: &arvados.Cluster{
- ClusterID: "zzzzz",
- PostgreSQL: integrationTestCluster().PostgreSQL,
- }}
+ ctx := ctxlog.Context(context.Background(), log)
+ handler := &Handler{
+ Cluster: &arvados.Cluster{
+ ClusterID: "zzzzz",
+ PostgreSQL: integrationTestCluster().PostgreSQL,
+ },
+ BackgroundContext: ctx,
+ }
handler.Cluster.TLS.Insecure = true
handler.Cluster.Collections.BlobSigning = true
handler.Cluster.Collections.BlobSigningKey = arvadostest.BlobSigningKey
srv := &httpserver.Server{
Server: http.Server{
- BaseContext: func(net.Listener) context.Context {
- return ctxlog.Context(context.Background(), log)
- },
- Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
+ BaseContext: func(net.Listener) context.Context { return ctx },
+ Handler: httpserver.AddRequestIDs(httpserver.LogRequests(handler)),
},
Addr: ":",
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package controller
+
+import (
+ "time"
+
+ "git.arvados.org/arvados.git/lib/controller/dblock"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+)
+
+func (h *Handler) trashSweepWorker() {
+ sleep := h.Cluster.Collections.TrashSweepInterval.Duration()
+ logger := ctxlog.FromContext(h.BackgroundContext).WithField("worker", "trash sweep")
+ ctx := ctxlog.Context(h.BackgroundContext, logger)
+ if sleep <= 0 {
+ logger.Debugf("Collections.TrashSweepInterval is %v, not running worker", sleep)
+ return
+ }
+ dblock.TrashSweep.Lock(ctx, h.db)
+ defer dblock.TrashSweep.Unlock()
+ for time.Sleep(sleep); ctx.Err() == nil; time.Sleep(sleep) {
+ dblock.TrashSweep.Check()
+ ctx := auth.NewContext(ctx, &auth.Credentials{Tokens: []string{h.Cluster.SystemRootToken}})
+ _, err := h.federation.SysTrashSweep(ctx, struct{}{})
+ if err != nil {
+ logger.WithError(err).Info("trash sweep failed")
+ }
+ }
+}
var err error
logger := ctxlog.New(stderr, "text", "info")
logger.SetFormatter(cmd.NoPrefixFormatter{})
- defer func() {
- if err != nil {
- logger.Error("\n" + err.Error())
- }
- }()
exitcode, err := c.costAnalyzer(prog, args, logger, stdout, stderr)
-
+ if err != nil {
+ logger.Error("\n" + err.Error())
+ }
return exitcode
}
"strings"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
return nil
}
-func (c *command) parseFlags(prog string, args []string, logger *logrus.Logger, stderr io.Writer) (exitCode int, err error) {
+func (c *command) parseFlags(prog string, args []string, logger *logrus.Logger, stderr io.Writer) (ok bool, exitCode int) {
var beginStr, endStr string
flags := flag.NewFlagSet("", flag.ContinueOnError)
- flags.SetOutput(stderr)
flags.Usage = func() {
fmt.Fprintf(flags.Output(), `
Usage:
flags.StringVar(&beginStr, "begin", "", fmt.Sprintf("timestamp `begin` for date range operation (format: %s)", timestampFormat))
flags.StringVar(&endStr, "end", "", fmt.Sprintf("timestamp `end` for date range operation (format: %s)", timestampFormat))
flags.BoolVar(&c.cache, "cache", true, "create and use a local disk cache of Arvados objects")
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- exitCode = 1
- return
- } else if err != nil {
- exitCode = 2
- return
+ if ok, code := cmd.ParseFlags(flags, prog, args, "[uuid ...]", stderr); !ok {
+ return false, code
}
c.uuids = flags.Args()
if (len(beginStr) != 0 && len(endStr) == 0) || (len(beginStr) == 0 && len(endStr) != 0) {
- flags.Usage()
- err = fmt.Errorf("When specifying a date range, both begin and end must be specified")
- exitCode = 2
- return
+ fmt.Fprintf(stderr, "When specifying a date range, both begin and end must be specified (try -help)\n")
+ return false, 2
}
if len(beginStr) != 0 {
c.begin, errB = time.Parse(timestampFormat, beginStr)
c.end, errE = time.Parse(timestampFormat, endStr)
if (errB != nil) || (errE != nil) {
- flags.Usage()
- err = fmt.Errorf("When specifying a date range, both begin and end must be of the format %s %+v, %+v", timestampFormat, errB, errE)
- exitCode = 2
- return
+ fmt.Fprintf(stderr, "When specifying a date range, both begin and end must be of the format %s %+v, %+v\n", timestampFormat, errB, errE)
+ return false, 2
}
}
if (len(c.uuids) < 1) && (len(beginStr) == 0) {
- flags.Usage()
- err = fmt.Errorf("error: no uuid(s) provided")
- exitCode = 2
- return
+ fmt.Fprintf(stderr, "error: no uuid(s) provided (try -help)\n")
+ return false, 2
}
lvl, err := logrus.ParseLevel(*loglevel)
if err != nil {
- exitCode = 2
- return
+ fmt.Fprintf(stderr, "invalid argument to -log-level: %s\n", err)
+ return false, 2
}
logger.SetLevel(lvl)
if !c.cache {
logger.Debug("Caching disabled")
}
- return
+ return true, 0
}
func ensureDirectory(logger *logrus.Logger, dir string) (err error) {
}
func (c *command) costAnalyzer(prog string, args []string, logger *logrus.Logger, stdout, stderr io.Writer) (exitcode int, err error) {
- exitcode, err = c.parseFlags(prog, args, logger, stderr)
-
- if exitcode != 0 {
+ var ok bool
+ ok, exitcode = c.parseFlags(prog, args, logger, stderr)
+ if !ok {
return
}
if c.resultsDir != "" {
cost[k] = v
}
} else if strings.Contains(uuid, "-xvhdp-") || strings.Contains(uuid, "-4zz18-") {
- // This is a container request
+ // This is a container request or collection
var crInfo map[string]consumption
crInfo, err = generateCrInfo(logger, uuid, arv, ac, kc, c.resultsDir, c.cache)
if err != nil {
func (*Suite) TestUsage(c *check.C) {
var stdout, stderr bytes.Buffer
exitcode := Command.RunCommand("costanalyzer.test", []string{"-help", "-log-level=debug"}, &bytes.Buffer{}, &stdout, &stderr)
- c.Check(exitcode, check.Equals, 1)
+ c.Check(exitcode, check.Equals, 0)
c.Check(stdout.String(), check.Equals, "")
c.Check(stderr.String(), check.Matches, `(?ms).*Usage:.*`)
}
func (*Suite) TestCollectionUUID(c *check.C) {
var stdout, stderr bytes.Buffer
-
resultsDir := c.MkDir()
- // Run costanalyzer with 1 collection uuid, without 'container_request' property
- exitcode := Command.RunCommand("costanalyzer.test", []string{"-output", resultsDir, arvadostest.FooCollection}, &bytes.Buffer{}, &stdout, &stderr)
- c.Check(exitcode, check.Equals, 2)
- c.Assert(stderr.String(), check.Matches, "(?ms).*does not have a 'container_request' property.*")
- // Update the collection, attach a 'container_request' property
+ // Create a collection with no container_request property
ac := arvados.NewClientFromEnv()
var coll arvados.Collection
+ err := ac.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, nil)
+ c.Assert(err, check.IsNil)
- // Update collection record
- err := ac.RequestAndDecode(&coll, "PUT", "arvados/v1/collections/"+arvadostest.FooCollection, nil, map[string]interface{}{
+ exitcode := Command.RunCommand("costanalyzer.test", []string{"-output", resultsDir, coll.UUID}, &bytes.Buffer{}, &stdout, &stderr)
+ c.Check(exitcode, check.Equals, 2)
+ c.Assert(stderr.String(), check.Matches, "(?ms).*does not have a 'container_request' property.*")
+
+ stdout.Truncate(0)
+ stderr.Truncate(0)
+
+ // Add a container_request property
+ err = ac.RequestAndDecode(&coll, "PATCH", "arvados/v1/collections/"+coll.UUID, nil, map[string]interface{}{
"collection": map[string]interface{}{
"properties": map[string]interface{}{
"container_request": arvadostest.CompletedContainerRequestUUID,
})
c.Assert(err, check.IsNil)
- stdout.Truncate(0)
- stderr.Truncate(0)
-
- // Run costanalyzer with 1 collection uuid
+ // Re-run costanalyzer on the updated collection
resultsDir = c.MkDir()
- exitcode = Command.RunCommand("costanalyzer.test", []string{"-output", resultsDir, arvadostest.FooCollection}, &bytes.Buffer{}, &stdout, &stderr)
+ exitcode = Command.RunCommand("costanalyzer.test", []string{"-output", resultsDir, coll.UUID}, &bytes.Buffer{}, &stdout, &stderr)
c.Check(exitcode, check.Equals, 0)
c.Assert(stderr.String(), check.Matches, "(?ms).*supplied uuids in .*")
ignoreDetachFlag = true
}
- if err := flags.Parse(args); err == flag.ErrHelp {
- return 0
- } else if err != nil {
- log.Print(err)
- return 1
+ if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok {
+ return code
+ } else if flags.NArg() != 1 {
+ fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n")
+ return 2
}
containerUUID := flags.Arg(0)
"io"
"strings"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
return
}
-func parseFlags(prog string, args []string, logger *logrus.Logger, stderr io.Writer) ([]string, error) {
- flags := flag.NewFlagSet("", flag.ContinueOnError)
- flags.SetOutput(stderr)
+// parseFlags returns either some inputs to process, or (if there are
+// no inputs to process) a nil slice and a suitable exit code.
+func parseFlags(prog string, args []string, logger *logrus.Logger, stderr io.Writer) (inputs []string, exitcode int) {
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
flags.Usage = func() {
fmt.Fprintf(flags.Output(), `
Usage:
%s [options ...] <collection-uuid> <collection-uuid> ...
- %s [options ...] <collection-pdh>,<collection_uuid> \
- <collection-pdh>,<collection_uuid> ...
+ %s [options ...] <collection-pdh>,<collection-uuid> \
+ <collection-pdh>,<collection-uuid> ...
This program analyzes the overlap in blocks used by 2 or more collections. It
prints a deduplication report that shows the nominal space used by the
flags.PrintDefaults()
}
loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
- err := flags.Parse(args)
- if err == flag.ErrHelp {
- return nil, err
- } else if err != nil {
- return nil, err
+ if ok, code := cmd.ParseFlags(flags, prog, args, "collection-uuid [...]", stderr); !ok {
+ return nil, code
}
- inputs := flags.Args()
-
- inputs = deDuplicate(inputs)
+ inputs = deDuplicate(flags.Args())
if len(inputs) < 1 {
- err = fmt.Errorf("Error: no collections provided")
- return inputs, err
+ fmt.Fprintf(stderr, "Error: no collections provided\n")
+ return nil, 2
}
lvl, err := logrus.ParseLevel(*loglevel)
if err != nil {
- return inputs, err
+ fmt.Fprintf(stderr, "Error: cannot parse log level: %s\n", err)
+ return nil, 2
}
logger.SetLevel(lvl)
- return inputs, err
+ return inputs, 0
}
func blockList(collection arvados.Collection) (blocks map[string]int) {
func report(prog string, args []string, logger *logrus.Logger, stdout, stderr io.Writer) (exitcode int) {
var inputs []string
- var err error
-
- inputs, err = parseFlags(prog, args, logger, stderr)
- if err == flag.ErrHelp {
- return 0
- } else if err != nil {
- logger.Error(err.Error())
- return 2
+
+ inputs, exitcode = parseFlags(prog, args, logger, stderr)
+ if inputs == nil {
+ return
}
// Arvados Client setup
"strings"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/sirupsen/logrus"
type Command struct{}
-func (cmd Command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+func (Command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
var diag diagnoser
f := flag.NewFlagSet(prog, flag.ContinueOnError)
f.StringVar(&diag.projectName, "project-name", "scratch area for diagnostics", "name of project to find/create in home project and use for temporary/test objects")
f.BoolVar(&diag.checkExternal, "external-client", false, "check that this host is considered an \"external\" client")
f.IntVar(&diag.priority, "priority", 500, "priority for test container (1..1000, or 0 to skip)")
f.DurationVar(&diag.timeout, "timeout", 10*time.Second, "timeout for http requests")
- err := f.Parse(args)
- if err == flag.ErrHelp {
- return 0
- } else if err != nil {
- fmt.Fprintln(stderr, err)
- return 2
+ if ok, code := cmd.ParseFlags(f, prog, args, "", stderr); !ok {
+ return code
}
diag.logger = ctxlog.New(stdout, "text", diag.logLevel)
diag.logger.SetFormatter(&logrus.TextFormatter{DisableTimestamp: true, DisableLevelTruncation: true, PadLevelText: true})
flags.StringVar(&inst.SourcePath, "source", "/arvados", "source tree location (required for -type=package)")
flags.StringVar(&inst.PackageVersion, "package-version", "0.0.0", "version string to embed in executable files")
flags.BoolVar(&inst.EatMyData, "eatmydata", false, "use eatmydata to speed up install")
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- return 0
- } else if err != nil {
- return 2
+
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return code
} else if *versionFlag {
return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
- } else if len(flags.Args()) > 0 {
- err = fmt.Errorf("unrecognized command line arguments: %v", flags.Args())
- return 2
}
var dev, test, prod, pkg bool
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
flags.StringVar(&initcmd.ClusterID, "cluster-id", "", "cluster `id`, like x1234 for a dev cluster")
flags.StringVar(&initcmd.Domain, "domain", hostname, "cluster public DNS `name`, like x1234.arvadosapi.com")
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- return 0
- } else if err != nil {
- return 2
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return code
} else if *versionFlag {
return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
- } else if len(flags.Args()) > 0 {
- err = fmt.Errorf("unrecognized command line arguments: %v", flags.Args())
- return 2
} else if !regexp.MustCompile(`^[a-z][a-z0-9]{4}`).MatchString(initcmd.ClusterID) {
err = fmt.Errorf("cluster ID %q is invalid; must be an ASCII letter followed by 4 alphanumerics (try -help)", initcmd.ClusterID)
return 1
if ctr.State != dispatch.Locked {
// already started by prior invocation
- } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
- // If the container disappears from the lsf queue, there is
- // no point in waiting for further dispatch updates: just
- // clean up and return.
go func(uuid string) {
+ cancelled := false
for ctx.Err() == nil {
- if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+ qent, ok := disp.lsfqueue.Lookup(uuid)
+ if !ok {
+ // If the container disappears from
+ // the lsf queue, there is no point in
+ // waiting for further dispatch
+ // updates: just clean up and return.
disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
cancel()
return
}
+ if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
+ disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
+ err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
+ "container": map[string]interface{}{
+ "runtime_status": map[string]string{
+ "error": qent.PendReason,
+ },
+ },
+ }, nil)
+ if err != nil {
+ disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
+ continue // retry
+ }
+ err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
+ if err != nil {
+ continue // retry (UpdateState() already logged the error)
+ }
+ cancelled = true
+ }
}
}(ctr.UUID)
// from the queue.
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
- for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
- err := disp.lsfcli.Bkill(jobid)
+ for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+ err := disp.lsfcli.Bkill(qent.ID)
if err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
<-ticker.C
}
}
func (disp *dispatcher) bkill(ctr arvados.Container) {
- if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
- } else if err := disp.lsfcli.Bkill(jobid); err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
}
import (
"context"
+ "encoding/json"
"fmt"
"math/rand"
"os/exec"
var _ = check.Suite(&suite{})
type suite struct {
- disp *dispatcher
+ disp *dispatcher
+ crTooBig arvados.ContainerRequest
}
func (s *suite) TearDownTest(c *check.C) {
s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
}
+ err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "runtime_constraints": arvados.RuntimeConstraints{
+ RAM: 1000000000000,
+ VCPUs: 1,
+ },
+ "container_image": arvadostest.DockerImage112PDH,
+ "command": []string{"sleep", "1"},
+ "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+ "output_path": "/mnt/out",
+ "state": arvados.ContainerRequestStateCommitted,
+ "priority": 1,
+ "container_count_max": 1,
+ },
+ })
+ c.Assert(err, check.IsNil)
}
type lsfstub struct {
"-J", arvadostest.LockedContainerUUID,
"-n", "4",
"-D", "11701MB",
- "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
+ "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=0MB]",
+ "-R", "select[ncpus>=4]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
"-J", arvadostest.QueuedContainerUUID,
"-n", "4",
"-D", "11701MB",
- "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
+ "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=45777MB]",
+ "-R", "select[ncpus>=4]"})
+ mtx.Lock()
+ fakejobq[nextjobid] = args[1]
+ nextjobid++
+ mtx.Unlock()
+ case s.crTooBig.ContainerUUID:
+ c.Check(args, check.DeepEquals, []string{
+ "-J", s.crTooBig.ContainerUUID,
+ "-n", "1",
+ "-D", "954187MB",
+ "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
+ "-R", "select[mem>=954187MB]",
+ "-R", "select[tmp>=256MB]",
+ "-R", "select[ncpus>=1]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
}
return exec.Command("echo", "submitted job")
case "bjobs":
- c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
- out := ""
+ c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
+ var records []map[string]interface{}
for jobid, uuid := range fakejobq {
- out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+ stat, reason := "RUN", ""
+ if uuid == s.crTooBig.ContainerUUID {
+ // The real bjobs output includes a trailing ';' here:
+ stat, reason = "PEND", "There are no suitable hosts for the job;"
+ }
+ records = append(records, map[string]interface{}{
+ "JOBID": fmt.Sprintf("%d", jobid),
+ "STAT": stat,
+ "JOB_NAME": uuid,
+ "PEND_REASON": reason,
+ })
}
- c.Logf("bjobs out: %q", out)
- return exec.Command("printf", out)
+ out, err := json.Marshal(map[string]interface{}{
+ "COMMAND": "bjobs",
+ "JOBS": len(fakejobq),
+ "RECORDS": records,
+ })
+ if err != nil {
+ panic(err)
+ }
+ c.Logf("bjobs out: %s", out)
+ return exec.Command("printf", string(out))
case "bkill":
killid, _ := strconv.Atoi(args[0])
if uuid, ok := fakejobq[killid]; !ok {
sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser,
}.stubCommand(s, c)
s.disp.Start()
+
deadline := time.Now().Add(20 * time.Second)
for range time.NewTicker(time.Second).C {
if time.Now().After(deadline) {
break
}
// "queuedcontainer" should be running
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+ if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
continue
}
// "lockedcontainer" should be cancelled because it
// has priority 0 (no matching container requests)
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+ if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+ continue
+ }
+ // "crTooBig" should be cancelled because lsf stub
+ // reports there is no suitable instance type
+ if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
continue
}
var ctr arvados.Container
if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
continue
- }
- if ctr.State != arvados.ContainerStateQueued {
+ } else if ctr.State != arvados.ContainerStateQueued {
c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
continue
}
+
+ if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
+ c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
+ continue
+ } else if ctr.State != arvados.ContainerStateCancelled {
+ c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
+ continue
+ } else {
+ c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
+ }
c.Log("reached desired state")
break
}
import (
"bytes"
+ "encoding/json"
"fmt"
"os"
"os/exec"
)
type bjobsEntry struct {
- id int
- name string
- stat string
+ ID string `json:"JOBID"`
+ Name string `json:"JOB_NAME"`
+ Stat string `json:"STAT"`
+ PendReason string `json:"PEND_REASON"`
}
type lsfcli struct {
func (cli lsfcli) Bjobs() ([]bjobsEntry, error) {
cli.logger.Debugf("Bjobs()")
- cmd := cli.command("bjobs", "-u", "all", "-noheader", "-o", "jobid stat job_name:30")
+ cmd := cli.command("bjobs", "-u", "all", "-o", "jobid stat job_name pend_reason", "-json")
buf, err := cmd.Output()
if err != nil {
return nil, errWithStderr(err)
}
- var bjobs []bjobsEntry
- for _, line := range strings.Split(string(buf), "\n") {
- if line == "" {
- continue
- }
- var ent bjobsEntry
- if _, err := fmt.Sscan(line, &ent.id, &ent.stat, &ent.name); err != nil {
- cli.logger.Warnf("ignoring unparsed line in bjobs output: %q", line)
- continue
- }
- bjobs = append(bjobs, ent)
+ var resp struct {
+ Records []bjobsEntry `json:"RECORDS"`
}
- return bjobs, nil
+ err = json.Unmarshal(buf, &resp)
+ return resp.Records, err
}
-func (cli lsfcli) Bkill(id int) error {
- cli.logger.Infof("Bkill(%d)", id)
- cmd := cli.command("bkill", fmt.Sprintf("%d", id))
+func (cli lsfcli) Bkill(id string) error {
+ cli.logger.Infof("Bkill(%s)", id)
+ cmd := cli.command("bkill", id)
buf, err := cmd.CombinedOutput()
if err == nil || strings.Index(string(buf), "already finished") >= 0 {
return nil
latest map[string]bjobsEntry
}
-// JobID waits for the next queue update (so even a job that was only
+// Lookup waits for the next queue update (so even a job that was only
// submitted a nanosecond ago will show up) and then returns the LSF
-// job ID corresponding to the given container UUID.
-func (q *lsfqueue) JobID(uuid string) (int, bool) {
+// queue information corresponding to the given container UUID.
+func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) {
ent, ok := q.getNext()[uuid]
- return ent.id, ok
+ return ent, ok
}
// All waits for the next queue update, then returns the names of all
}
next := make(map[string]bjobsEntry, len(ents))
for _, ent := range ents {
- next[ent.name] = ent
+ next[ent.Name] = ent
}
// Replace q.latest and notify all the
// goroutines that the "next update" they
"io"
"log"
"net/http"
+
// pprof is only imported to register its HTTP handlers
_ "net/http/pprof"
"os"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"github.com/arvados/cgofuse/fuse"
)
-var Command = &cmd{}
+var Command = &mountCommand{}
-type cmd struct {
+type mountCommand struct {
// ready, if non-nil, will be closed when the mount is
// initialized. If ready is non-nil, it RunCommand() should
// not be called more than once, or when ready is already
//
// The "-d" fuse option (and perhaps other features) ignores the
// stderr argument and prints to os.Stderr instead.
-func (c *cmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+func (c *mountCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
logger := log.New(stderr, prog+" ", 0)
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
ro := flags.Bool("ro", false, "read-only")
experimental := flags.Bool("experimental", false, "acknowledge this is an experimental command, and should not be used in production (required)")
blockCache := flags.Int("block-cache", 4, "read cache size (number of 64MiB blocks)")
pprof := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
- err := flags.Parse(args)
- if err != nil {
- logger.Print(err)
- return 2
+ if ok, code := cmd.ParseFlags(flags, prog, args, "[FUSE mount options]", stderr); !ok {
+ return code
}
if !*experimental {
logger.Printf("error: experimental command %q used without --experimental flag", prog)
stdin := bytes.NewBufferString("stdin")
stdout := bytes.NewBuffer(nil)
stderr := bytes.NewBuffer(nil)
- mountCmd := cmd{ready: make(chan struct{})}
+ mountCmd := mountCommand{ready: make(chan struct{})}
ready := false
go func() {
exited <- mountCmd.RunCommand("test mount", []string{"--experimental", s.mnt}, stdin, stdout, stderr)
"sync"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
loader := config.NewLoader(stdin, logger)
loader.SkipLegacy = true
- flags := flag.NewFlagSet("", flag.ContinueOnError)
- flags.SetOutput(stderr)
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
flags.Usage = func() {
fmt.Fprintf(flags.Output(), `Usage:
%s [options ...] { /path/to/manifest.txt | log-or-collection-uuid } [...]
}
loader.SetupFlags(flags)
loglevel := flags.String("log-level", "info", "logging level (debug, info, ...)")
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- return 0
- } else if err != nil {
- return 2
- }
-
- if len(flags.Args()) == 0 {
- flags.Usage()
+ if ok, code := cmd.ParseFlags(flags, prog, args, "source [...]", stderr); !ok {
+ return code
+ } else if flags.NArg() == 0 {
+ fmt.Fprintf(stderr, "missing required arguments (try -help)\n")
return 2
}
loader.SetupFlags(flags)
versionFlag := flags.Bool("version", false, "Write version information to stdout and exit 0")
pprofAddr := flags.String("pprof", "", "Serve Go profile data at `[addr]:port`")
- err = flags.Parse(args)
- if err == flag.ErrHelp {
- err = nil
- return 0
- } else if err != nil {
- return 2
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return code
} else if *versionFlag {
return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
}
# file to determine what version of cwltool and schema-salad to
# build.
install_requires=[
- 'cwltool==3.1.20211020155521',
- 'schema-salad==8.2.20211020114435',
+ 'cwltool==3.1.20211107152837',
+ 'schema-salad==8.2.20211116214159',
'arvados-python-client{}'.format(pysdk_dep),
'setuptools',
'ciso8601 >= 2.0.0',
EndpointLinkGet = APIEndpoint{"GET", "arvados/v1/links/{uuid}", ""}
EndpointLinkList = APIEndpoint{"GET", "arvados/v1/links", ""}
EndpointLinkDelete = APIEndpoint{"DELETE", "arvados/v1/links/{uuid}", ""}
+ EndpointSysTrashSweep = APIEndpoint{"POST", "sys/trash_sweep", ""}
EndpointUserActivate = APIEndpoint{"POST", "arvados/v1/users/{uuid}/activate", ""}
EndpointUserCreate = APIEndpoint{"POST", "arvados/v1/users", "user"}
EndpointUserCurrent = APIEndpoint{"GET", "arvados/v1/users/current", ""}
SpecimenGet(ctx context.Context, options GetOptions) (Specimen, error)
SpecimenList(ctx context.Context, options ListOptions) (SpecimenList, error)
SpecimenDelete(ctx context.Context, options DeleteOptions) (Specimen, error)
+ SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error)
UserCreate(ctx context.Context, options CreateOptions) (User, error)
UserUpdate(ctx context.Context, options UpdateOptions) (User, error)
UserMerge(ctx context.Context, options UserMergeOptions) (User, error)
return err
}
switch {
+ case resp.StatusCode == http.StatusNoContent:
+ return nil
case resp.StatusCode == http.StatusOK && dst == nil:
return nil
case resp.StatusCode == http.StatusOK:
as.appendCall(ctx, as.SpecimenDelete, options)
return arvados.Specimen{}, as.Error
}
+func (as *APIStub) SysTrashSweep(ctx context.Context, options struct{}) (struct{}, error) {
+ as.appendCall(ctx, as.SysTrashSweep, options)
+ return struct{}{}, as.Error
+}
func (as *APIStub) UserCreate(ctx context.Context, options arvados.CreateOptions) (arvados.User, error) {
as.appendCall(ctx, as.UserCreate, options)
return arvados.User{}, as.Error
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvadostest
+
+import "git.arvados.org/arvados.git/sdk/go/arvados"
+
+// Test that *APIStub implements arvados.API
+var _ arvados.API = &APIStub{}
storage_classes=None,
trash_at=None,
merge=True,
- num_retries=None):
+ num_retries=None,
+ preserve_version=False):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
body={}
if properties:
body["properties"] = properties
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
if not self.committed():
if self._has_remote_blocks:
storage_classes=None,
trash_at=None,
ensure_unique_name=False,
- num_retries=None):
+ num_retries=None,
+ preserve_version=False):
"""Save collection to a new collection record.
Commit pending buffer blocks to Keep and, when create_collection_record
:num_retries:
Retry count on API calls (if None, use the collection default)
+ :preserve_version:
+ If True, indicate that the collection content being saved right now
+ should be preserved in a version snapshot if the collection record is
+ updated in the future. Requires that the API server has
+ Collections.CollectionVersioning enabled, if not, setting this will
+ raise an exception.
+
"""
if properties and type(properties) is not dict:
raise errors.ArgumentError("properties must be dictionary type.")
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
+ if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+ raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
if self._has_remote_blocks:
# Copy any remote blocks to the local cluster.
self._copy_remote_blocks(remote_blocks={})
if trash_at:
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
+ if preserve_version:
+ body["preserve_version"] = preserve_version
self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
copy_opts.set_defaults(recursive=True)
parser = argparse.ArgumentParser(
- description='Copy a workflow, collection or project from one Arvados instance to another.',
+ description='Copy a workflow, collection or project from one Arvados instance to another. On success, the uuid of the copied object is printed to stdout.',
parents=[copy_opts, arv_cmd.retry_opt])
args = parser.parse_args()
logger.error("API server returned an error result: {}".format(result))
exit(1)
- logger.info("")
+ print(result['uuid'])
+
+ if result.get('partial_error'):
+ logger.warning("Warning: created copy with uuid {} but failed to copy some items: {}".format(result['uuid'], result['partial_error']))
+ exit(1)
+
logger.info("Success: created copy with uuid {}".format(result['uuid']))
exit(0)
# fetch the workflow from the source instance
wf = src.workflows().get(uuid=wf_uuid).execute(num_retries=args.retries)
+ if not wf["definition"]:
+ logger.warning("Workflow object {} has an empty or null definition, it won't do anything.".format(wf_uuid))
+
# copy collections and docker images
- if args.recursive:
+ if args.recursive and wf["definition"]:
wf_def = yaml.safe_load(wf["definition"])
if wf_def is not None:
locations = []
logger.debug('Copying %s to %s', obj_uuid, project_record["uuid"])
+
+ partial_error = ""
+
# Copy collections
- copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
- src, dst, args)
+ try:
+ copy_collections([col["uuid"] for col in arvados.util.list_all(src.collections().list, filters=[["owner_uuid", "=", obj_uuid]])],
+ src, dst, args)
+ except Exception as e:
+ partial_error += "\n" + str(e)
# Copy workflows
for w in arvados.util.list_all(src.workflows().list, filters=[["owner_uuid", "=", obj_uuid]]):
- copy_workflow(w["uuid"], src, dst, args)
+ try:
+ copy_workflow(w["uuid"], src, dst, args)
+ except Exception as e:
+ partial_error += "\n" + "Error while copying %s: %s" % (w["uuid"], e)
if args.recursive:
for g in arvados.util.list_all(src.groups().list, filters=[["owner_uuid", "=", obj_uuid]]):
- copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+ try:
+ copy_project(g["uuid"], src, dst, project_record["uuid"], args)
+ except Exception as e:
+ partial_error += "\n" + "Error while copying %s: %s" % (g["uuid"], e)
+
+ project_record["partial_error"] = partial_error
return project_record
'future',
'google-api-python-client >=1.6.2, <2',
'google-auth<2',
- 'httplib2 >=0.9.2',
+ 'httplib2 >=0.9.2, <0.20.2',
'pycurl >=7.19.5.1',
'ruamel.yaml >=0.15.54, <0.17.11',
'setuptools',
"UserProfileNotificationAddress": "arvados@example.com",
},
"Collections": {
+ "CollectionVersioning": True,
"BlobSigningKey": "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc",
"TrustAllContent": False,
"ForwardSlashNameSubstitution": "/",
contents = api.groups().list(filters=[["owner_uuid", "=", dest_proj]]).execute()
assert len(contents["items"]) == 0
- try:
- self.run_copy(["--project-uuid", dest_proj, "--storage-classes", "foo", src_proj])
- except SystemExit as e:
- assert e.code == 0
+ with tutil.redirected_streams(
+ stdout=tutil.StringIO, stderr=tutil.StringIO) as (out, err):
+ try:
+ self.run_copy(["--project-uuid", dest_proj, "--storage-classes", "foo", src_proj])
+ except SystemExit as e:
+ assert e.code == 0
+ copy_uuid_from_stdout = out.getvalue().strip()
contents = api.groups().list(filters=[["owner_uuid", "=", dest_proj]]).execute()
assert len(contents["items"]) == 1
assert contents["items"][0]["name"] == "arv-copy project"
copied_project = contents["items"][0]["uuid"]
+ assert copied_project == copy_uuid_from_stdout
+
contents = api.collections().list(filters=[["owner_uuid", "=", copied_project]]).execute()
assert len(contents["items"]) == 1
class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
+ def test_preserve_version_on_save(self):
+ c = Collection()
+ c.save_new(preserve_version=True)
+ coll_record = arvados.api().collections().get(uuid=c.manifest_locator()).execute()
+ self.assertEqual(coll_record['version'], 1)
+ self.assertEqual(coll_record['preserve_version'], True)
+ with c.open("foo.txt", "wb") as foo:
+ foo.write(b"foo")
+ c.save(preserve_version=True)
+ coll_record = arvados.api().collections().get(uuid=c.manifest_locator()).execute()
+ self.assertEqual(coll_record['version'], 2)
+ self.assertEqual(coll_record['preserve_version'], True)
+ with c.open("bar.txt", "wb") as foo:
+ foo.write(b"bar")
+ c.save(preserve_version=False)
+ coll_record = arvados.api().collections().get(uuid=c.manifest_locator()).execute()
+ self.assertEqual(coll_record['version'], 3)
+ self.assertEqual(coll_record['preserve_version'], False)
+
def test_get_manifest_text_only_committed(self):
c = Collection()
with c.open("count.txt", "wb") as f:
skip_before_action :find_object_by_uuid, only: :shared
skip_before_action :render_404_if_no_object, only: :shared
+ TRASHABLE_CLASSES = ['project']
+
def self._index_requires_parameters
(super rescue {}).
merge({
end
end
+ def destroy
+ if !TRASHABLE_CLASSES.include?(@object.group_class)
+ return @object.destroy
+ show
+ else
+ super # Calls destroy from TrashableController module
+ end
+ end
+
def render_404_if_no_object
if params[:action] == 'contents'
if !params[:uuid]
@offset = offset_all
end
- protected
-
def exclude_home objectlist, klass
# select records that are readable by current user AND
# the owner_uuid is a user (but not the current user) OR
}
}
+ discovery[:resources]['sys'] = {
+ methods: {
+ get: {
+ id: "arvados.sys.trash_sweep",
+ path: "sys/trash_sweep",
+ httpMethod: "POST",
+ description: "apply scheduled trash and delete operations",
+ parameters: {
+ },
+ parameterOrder: [
+ ],
+ response: {
+ },
+ scopes: [
+ "https://api.arvados.org/auth/arvados",
+ "https://api.arvados.org/auth/arvados.readonly"
+ ]
+ },
+ }
+ }
+
Rails.configuration.API.DisabledAPIs.each do |method, _|
ctrl, action = method.to_s.split('.', 2)
discovery[:resources][ctrl][:methods].delete(action.to_sym)
#
# SPDX-License-Identifier: AGPL-3.0
-require 'current_api_client'
+class SysController < ApplicationController
+ skip_before_action :find_object_by_uuid
+ skip_before_action :render_404_if_no_object
+ before_action :admin_required
-module SweepTrashedObjects
- extend CurrentApiClient
-
- def self.delete_project_and_contents(p_uuid)
- p = Group.find_by_uuid(p_uuid)
- if !p || p.group_class != 'project'
- raise "can't sweep group '#{p_uuid}', it may not exist or not be a project"
- end
- # First delete sub projects
- Group.where({group_class: 'project', owner_uuid: p_uuid}).each do |sub_project|
- delete_project_and_contents(sub_project.uuid)
- end
- # Next, iterate over all tables which have owner_uuid fields, with some
- # exceptions, and delete records owned by this project
- skipped_classes = ['Group', 'User']
- ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |klass|
- if !skipped_classes.include?(klass.name) && klass.columns.collect(&:name).include?('owner_uuid')
- klass.where({owner_uuid: p_uuid}).destroy_all
- end
- end
- # Finally delete the project itself
- p.destroy
- end
-
- def self.sweep_now
+ def trash_sweep
act_as_system_user do
# Sweep trashed collections
Collection.
where('is_trashed = false and trash_at < statement_timestamp()').
update_all('is_trashed = true')
- # Sweep trashed projects and their contents
+ # Sweep trashed projects and their contents (as well as role
+ # groups that were trashed before #18340 when that was
+ # disallowed)
Group.
- where({group_class: 'project'}).
where('delete_at is not null and delete_at < statement_timestamp()').each do |project|
delete_project_and_contents(project.uuid)
end
Group.
- where({group_class: 'project'}).
where('is_trashed = false and trash_at < statement_timestamp()').
update_all('is_trashed = true')
# Sweep expired tokens
ActiveRecord::Base.connection.execute("DELETE from api_client_authorizations where expires_at <= statement_timestamp()")
end
+ head :no_content
end
- def self.sweep_if_stale
- return if Rails.configuration.Collections.TrashSweepInterval <= 0
- exp = Rails.configuration.Collections.TrashSweepInterval.seconds
- need = false
- Rails.cache.fetch('SweepTrashedObjects', expires_in: exp) do
- need = true
+ protected
+
+ def delete_project_and_contents(p_uuid)
+ p = Group.find_by_uuid(p_uuid)
+ if !p
+ raise "can't sweep group '#{p_uuid}', it may not exist"
+ end
+ # First delete sub projects
+ Group.where({group_class: 'project', owner_uuid: p_uuid}).each do |sub_project|
+ delete_project_and_contents(sub_project.uuid)
end
- if need
- Thread.new do
- Thread.current.abort_on_exception = false
- begin
- sweep_now
- rescue => e
- Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
- ensure
- # Rails 5.1+ makes test threads share a database connection, so we can't
- # close a connection shared with other threads.
- # https://github.com/rails/rails/commit/deba47799ff905f778e0c98a015789a1327d5087
- if Rails.env != "test"
- ActiveRecord::Base.connection.close
- end
- end
+ # Next, iterate over all tables which have owner_uuid fields, with some
+ # exceptions, and delete records owned by this project
+ skipped_classes = ['Group', 'User']
+ ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |klass|
+ if !skipped_classes.include?(klass.name) && klass.columns.collect(&:name).include?('owner_uuid')
+ klass.where({owner_uuid: p_uuid}).destroy_all
end
end
+ # Finally delete the project itself
+ p.destroy
end
end
# SPDX-License-Identifier: AGPL-3.0
require 'arvados/keep'
-require 'sweep_trashed_objects'
require 'trashable'
class Collection < ArvadosModel
super - ["manifest_text", "storage_classes_desired", "storage_classes_confirmed", "current_version_uuid"]
end
- def self.where *args
- SweepTrashedObjects.sweep_if_stale
- super
- end
-
protected
# Although the defaults for these columns is already set up on the schema,
end
end
+ post '/sys/trash_sweep', to: 'sys#trash_sweep'
+
if Rails.env == 'test'
post '/database/reset', to: 'database#reset'
end
assert_includes(owners, groups(:asubproject).uuid)
end
+ [:afiltergroup, :private_role].each do |grp|
+ test "delete non-project group #{grp}" do
+ authorize_with :admin
+ assert_not_nil Group.find_by_uuid(groups(grp).uuid)
+ assert !Group.find_by_uuid(groups(grp).uuid).is_trashed
+ post :destroy, params: {
+ id: groups(grp).uuid,
+ format: :json,
+ }
+ assert_response :success
+ # Should not be trashed
+ assert_nil Group.find_by_uuid(groups(grp).uuid)
+ end
+ end
+
### trashed project tests ###
#
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'test_helper'
+
+class SysControllerTest < ActionController::TestCase
+ include CurrentApiClient
+ include DbCurrentTime
+
+ test "trash_sweep - delete expired tokens" do
+ assert_not_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
+ authorize_with :admin
+ post :trash_sweep
+ assert_response :success
+ assert_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
+ end
+
+ test "trash_sweep - fail with non-admin token" do
+ authorize_with :active
+ post :trash_sweep
+ assert_response 403
+ end
+
+ test "trash_sweep - move collections to trash" do
+ c = collections(:trashed_on_next_sweep)
+ refute_empty Collection.where('uuid=? and is_trashed=false', c.uuid)
+ assert_raises(ActiveRecord::RecordNotUnique) do
+ act_as_user users(:active) do
+ Collection.create!(owner_uuid: c.owner_uuid,
+ name: c.name)
+ end
+ end
+ authorize_with :admin
+ post :trash_sweep
+ assert_response :success
+ c = Collection.where('uuid=? and is_trashed=true', c.uuid).first
+ assert c
+ act_as_user users(:active) do
+ assert Collection.create!(owner_uuid: c.owner_uuid,
+ name: c.name)
+ end
+ end
+
+ test "trash_sweep - delete collections" do
+ uuid = 'zzzzz-4zz18-3u1p5umicfpqszp' # deleted_on_next_sweep
+ assert_not_empty Collection.where(uuid: uuid)
+ authorize_with :admin
+ post :trash_sweep
+ assert_response :success
+ assert_empty Collection.where(uuid: uuid)
+ end
+
+ test "trash_sweep - delete referring links" do
+ uuid = collections(:trashed_on_next_sweep).uuid
+ act_as_system_user do
+ assert_raises ActiveRecord::RecordInvalid do
+ # Cannot create because :trashed_on_next_sweep is already trashed
+ Link.create!(head_uuid: uuid,
+ tail_uuid: system_user_uuid,
+ link_class: 'whatever',
+ name: 'something')
+ end
+
+ # Bump trash_at to now + 1 minute
+ Collection.where(uuid: uuid).
+ update(trash_at: db_current_time + (1).minute)
+
+ # Not considered trashed now
+ Link.create!(head_uuid: uuid,
+ tail_uuid: system_user_uuid,
+ link_class: 'whatever',
+ name: 'something')
+ end
+ past = db_current_time
+ Collection.where(uuid: uuid).
+ update_all(is_trashed: true, trash_at: past, delete_at: past)
+ assert_not_empty Collection.where(uuid: uuid)
+ authorize_with :admin
+ post :trash_sweep
+ assert_response :success
+ assert_empty Collection.where(uuid: uuid)
+ end
+
+ test "trash_sweep - move projects to trash" do
+ p = groups(:trashed_on_next_sweep)
+ assert_empty Group.where('uuid=? and is_trashed=true', p.uuid)
+ authorize_with :admin
+ post :trash_sweep
+ assert_response :success
+ assert_not_empty Group.where('uuid=? and is_trashed=true', p.uuid)
+ end
+
+ test "trash_sweep - delete projects and their contents" do
+ g_foo = groups(:trashed_project)
+ g_bar = groups(:trashed_subproject)
+ g_baz = groups(:trashed_subproject3)
+ col = collections(:collection_in_trashed_subproject)
+ job = jobs(:job_in_trashed_project)
+ cr = container_requests(:cr_in_trashed_project)
+ # Save how many objects were before the sweep
+ user_nr_was = User.all.length
+ coll_nr_was = Collection.all.length
+ group_nr_was = Group.where('group_class<>?', 'project').length
+ project_nr_was = Group.where(group_class: 'project').length
+ cr_nr_was = ContainerRequest.all.length
+ job_nr_was = Job.all.length
+ assert_not_empty Group.where(uuid: g_foo.uuid)
+ assert_not_empty Group.where(uuid: g_bar.uuid)
+ assert_not_empty Group.where(uuid: g_baz.uuid)
+ assert_not_empty Collection.where(uuid: col.uuid)
+ assert_not_empty Job.where(uuid: job.uuid)
+ assert_not_empty ContainerRequest.where(uuid: cr.uuid)
+
+ authorize_with :admin
+ post :trash_sweep
+ assert_response :success
+
+ assert_empty Group.where(uuid: g_foo.uuid)
+ assert_empty Group.where(uuid: g_bar.uuid)
+ assert_empty Group.where(uuid: g_baz.uuid)
+ assert_empty Collection.where(uuid: col.uuid)
+ assert_empty Job.where(uuid: job.uuid)
+ assert_empty ContainerRequest.where(uuid: cr.uuid)
+ # No unwanted deletions should have happened
+ assert_equal user_nr_was, User.all.length
+ assert_equal coll_nr_was-2, # collection_in_trashed_subproject
+ Collection.all.length # & deleted_on_next_sweep collections
+ assert_equal group_nr_was, Group.where('group_class<>?', 'project').length
+ assert_equal project_nr_was-3, Group.where(group_class: 'project').length
+ assert_equal cr_nr_was-1, ContainerRequest.all.length
+ assert_equal job_nr_was-1, Job.all.length
+ end
+
+end
# Generally, new routes should appear under /arvados/v1/. If
# they appear elsewhere, that might have been caused by default
# rails generator behavior that we don't want.
- assert_match(/^\/(|\*a|arvados\/v1\/.*|auth\/.*|login|logout|database\/reset|discovery\/.*|static\/.*|themes\/.*|assets|_health\/.*)(\(\.:format\))?$/,
+ assert_match(/^\/(|\*a|arvados\/v1\/.*|auth\/.*|login|logout|database\/reset|discovery\/.*|static\/.*|sys\/trash_sweep|themes\/.*|assets|_health\/.*)(\(\.:format\))?$/,
route.path.spec.to_s,
"Unexpected new route: #{route.path.spec}")
end
# SPDX-License-Identifier: AGPL-3.0
require 'test_helper'
-require 'sweep_trashed_objects'
class ApiClientAuthorizationTest < ActiveSupport::TestCase
include CurrentApiClient
end
end
- test "delete expired in SweepTrashedObjects" do
- assert_not_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
- SweepTrashedObjects.sweep_now
- assert_empty ApiClientAuthorization.where(uuid: api_client_authorizations(:expired).uuid)
- end
-
test "accepts SystemRootToken" do
assert_nil ApiClientAuthorization.validate(token: "xxxSystemRootTokenxxx")
# SPDX-License-Identifier: AGPL-3.0
require 'test_helper'
-require 'sweep_trashed_objects'
require 'fix_collection_versions_timestamps'
class CollectionTest < ActiveSupport::TestCase
assert_includes(coll_uuids, collections(:docker_image).uuid)
end
- test "move collections to trash in SweepTrashedObjects" do
- c = collections(:trashed_on_next_sweep)
- refute_empty Collection.where('uuid=? and is_trashed=false', c.uuid)
- assert_raises(ActiveRecord::RecordNotUnique) do
- act_as_user users(:active) do
- Collection.create!(owner_uuid: c.owner_uuid,
- name: c.name)
- end
- end
- SweepTrashedObjects.sweep_now
- c = Collection.where('uuid=? and is_trashed=true', c.uuid).first
- assert c
- act_as_user users(:active) do
- assert Collection.create!(owner_uuid: c.owner_uuid,
- name: c.name)
- end
- end
-
- test "delete collections in SweepTrashedObjects" do
- uuid = 'zzzzz-4zz18-3u1p5umicfpqszp' # deleted_on_next_sweep
- assert_not_empty Collection.where(uuid: uuid)
- SweepTrashedObjects.sweep_now
- assert_empty Collection.where(uuid: uuid)
- end
-
- test "delete referring links in SweepTrashedObjects" do
- uuid = collections(:trashed_on_next_sweep).uuid
- act_as_system_user do
- assert_raises ActiveRecord::RecordInvalid do
- # Cannot create because :trashed_on_next_sweep is already trashed
- Link.create!(head_uuid: uuid,
- tail_uuid: system_user_uuid,
- link_class: 'whatever',
- name: 'something')
- end
-
- # Bump trash_at to now + 1 minute
- Collection.where(uuid: uuid).
- update(trash_at: db_current_time + (1).minute)
-
- # Not considered trashed now
- Link.create!(head_uuid: uuid,
- tail_uuid: system_user_uuid,
- link_class: 'whatever',
- name: 'something')
- end
- past = db_current_time
- Collection.where(uuid: uuid).
- update_all(is_trashed: true, trash_at: past, delete_at: past)
- assert_not_empty Collection.where(uuid: uuid)
- SweepTrashedObjects.sweep_now
- assert_empty Collection.where(uuid: uuid)
- end
-
test "empty names are exempt from name uniqueness" do
act_as_user users(:active) do
c1 = Collection.new(name: nil, manifest_text: '', owner_uuid: groups(:aproject).uuid)
assert User.readable_by(users(:admin)).where(uuid: u_bar.uuid).any?
end
- test "move projects to trash in SweepTrashedObjects" do
- p = groups(:trashed_on_next_sweep)
- assert_empty Group.where('uuid=? and is_trashed=true', p.uuid)
- SweepTrashedObjects.sweep_now
- assert_not_empty Group.where('uuid=? and is_trashed=true', p.uuid)
- end
-
- test "delete projects and their contents in SweepTrashedObjects" do
- g_foo = groups(:trashed_project)
- g_bar = groups(:trashed_subproject)
- g_baz = groups(:trashed_subproject3)
- col = collections(:collection_in_trashed_subproject)
- job = jobs(:job_in_trashed_project)
- cr = container_requests(:cr_in_trashed_project)
- # Save how many objects were before the sweep
- user_nr_was = User.all.length
- coll_nr_was = Collection.all.length
- group_nr_was = Group.where('group_class<>?', 'project').length
- project_nr_was = Group.where(group_class: 'project').length
- cr_nr_was = ContainerRequest.all.length
- job_nr_was = Job.all.length
- assert_not_empty Group.where(uuid: g_foo.uuid)
- assert_not_empty Group.where(uuid: g_bar.uuid)
- assert_not_empty Group.where(uuid: g_baz.uuid)
- assert_not_empty Collection.where(uuid: col.uuid)
- assert_not_empty Job.where(uuid: job.uuid)
- assert_not_empty ContainerRequest.where(uuid: cr.uuid)
- SweepTrashedObjects.sweep_now
- assert_empty Group.where(uuid: g_foo.uuid)
- assert_empty Group.where(uuid: g_bar.uuid)
- assert_empty Group.where(uuid: g_baz.uuid)
- assert_empty Collection.where(uuid: col.uuid)
- assert_empty Job.where(uuid: job.uuid)
- assert_empty ContainerRequest.where(uuid: cr.uuid)
- # No unwanted deletions should have happened
- assert_equal user_nr_was, User.all.length
- assert_equal coll_nr_was-2, # collection_in_trashed_subproject
- Collection.all.length # & deleted_on_next_sweep collections
- assert_equal group_nr_was, Group.where('group_class<>?', 'project').length
- assert_equal project_nr_was-3, Group.where(group_class: 'project').length
- assert_equal cr_nr_was-1, ContainerRequest.all.length
- assert_equal job_nr_was-1, Job.all.length
- end
-
test "project names must be displayable in a filesystem" do
set_user_from_auth :active
["", "{SOLIDUS}"].each do |subst|
"fmt"
"os"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"github.com/coreos/go-systemd/daemon"
"github.com/ghodss/yaml"
TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
})
- flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
+ flags := flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
loader := config.NewLoader(os.Stdin, logger)
loader.SetupFlags(flags)
getVersion := flags.Bool("version", false, "print version information and exit.")
args := loader.MungeLegacyConfigArgs(logger, os.Args[1:], "-legacy-git-httpd-config")
- flags.Parse(args)
-
- if *getVersion {
+ if ok, code := cmd.ParseFlags(flags, os.Args[0], args, "", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
fmt.Printf("arv-git-httpd %s\n", version)
return
}
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
var version = "dev"
-func main() {
- err := doMain()
- if err != nil {
- logrus.Fatalf("%q", err)
- }
-}
-
var (
runningCmds map[string]*exec.Cmd
runningCmdsMutex sync.Mutex
crunchRunCommand *string
)
-func doMain() error {
+func main() {
logger := logrus.StandardLogger()
if os.Getenv("DEBUG") != "" {
logger.SetLevel(logrus.DebugLevel)
false,
"Print version information and exit.")
- // Parse args; omit the first arg which is the command name
- flags.Parse(os.Args[1:])
+ if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
+ os.Exit(code)
+ }
// Print version information if requested
if *getVersion {
fmt.Printf("crunch-dispatch-local %s\n", version)
- return nil
+ return
}
loader := config.NewLoader(nil, logger)
cfg, err := loader.Load()
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "error loading config: %s\n", err)
+ os.Exit(1)
+ }
cluster, err := cfg.GetCluster("")
if err != nil {
- return fmt.Errorf("config error: %s", err)
+ fmt.Fprintf(os.Stderr, "config error: %s\n", err)
+ os.Exit(1)
}
logger.Printf("crunch-dispatch-local %s started", version)
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
logger.Errorf("error making Arvados client: %v", err)
- return err
+ os.Exit(1)
}
arv.Retries = 25
err = dispatcher.Run(ctx)
if err != nil {
- return err
+ logger.Error(err)
+ return
}
c := make(chan os.Signal, 1)
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
-
- return nil
}
func startFunc(container arvados.Container, cmd *exec.Cmd) error {
"strings"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/sdk/go/arvados"
if disp.logger == nil {
disp.logger = logrus.StandardLogger()
}
- flags := flag.NewFlagSet(prog, flag.ExitOnError)
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
flags.Usage = func() { usage(flags) }
loader := config.NewLoader(nil, disp.logger)
false,
"Print version information and exit.")
- args = loader.MungeLegacyConfigArgs(logrus.StandardLogger(), args, "-legacy-crunch-dispatch-slurm-config")
-
- // Parse args; omit the first arg which is the command name
- err := flags.Parse(args)
-
- if err == flag.ErrHelp {
- return nil
+ args = loader.MungeLegacyConfigArgs(disp.logger, args, "-legacy-crunch-dispatch-slurm-config")
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", os.Stderr); !ok {
+ os.Exit(code)
}
// Print version information if requested
import (
"flag"
"fmt"
- "os"
)
func usage(fs *flag.FlagSet) {
- fmt.Fprintf(os.Stderr, `
+ fmt.Fprintf(fs.Output(), `
crunch-dispatch-slurm runs queued Arvados containers by submitting
SLURM batch jobs.
Options:
`)
fs.PrintDefaults()
- fmt.Fprintf(os.Stderr, `
+ fmt.Fprintf(fs.Output(), `
For configuration instructions see https://doc.arvados.org/install/crunch2-slurm/install-dispatch.html
`)
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/crunchstat"
)
Logger: log.New(os.Stderr, "crunchstat: ", 0),
}
- flag.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
- flag.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
- flag.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
- flag.IntVar(&signalOnDeadPPID, "signal-on-dead-ppid", signalOnDeadPPID, "Signal to send child if crunchstat's parent process disappears (0 to disable)")
- flag.DurationVar(&ppidCheckInterval, "ppid-check-interval", ppidCheckInterval, "Time between checks for parent process disappearance")
- pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
- getVersion := flag.Bool("version", false, "Print version information and exit.")
-
- flag.Parse()
-
- // Print version information if requested
- if *getVersion {
+ flags := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
+ flags.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
+ flags.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
+ flags.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
+ flags.IntVar(&signalOnDeadPPID, "signal-on-dead-ppid", signalOnDeadPPID, "Signal to send child if crunchstat's parent process disappears (0 to disable)")
+ flags.DurationVar(&ppidCheckInterval, "ppid-check-interval", ppidCheckInterval, "Time between checks for parent process disappearance")
+ pollMsec := flags.Int64("poll", 1000, "Reporting interval, in milliseconds")
+ getVersion := flags.Bool("version", false, "Print version information and exit.")
+
+ if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "program [args ...]", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
fmt.Printf("crunchstat %s\n", version)
return
+ } else if flags.NArg() == 0 {
+ fmt.Fprintf(os.Stderr, "missing required argument: program (try -help)\n")
+ os.Exit(2)
}
reporter.Logger.Printf("crunchstat %s started", version)
reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
reporter.Start()
- err := runCommand(flag.Args(), reporter.Logger)
+ err := runCommand(flags.Args(), reporter.Logger)
reporter.Stop()
if err, ok := err.(*exec.ExitError); ok {
usr = self.api.users().current().execute(num_retries=self.args.retries)
now = time.time()
dir_class = None
- dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries]
+ dir_args = [llfuse.ROOT_INODE, self.operations.inodes, self.api, self.args.retries, self.args.enable_write]
mount_readme = False
storage_classes = None
return
e = self.operations.inodes.add_entry(Directory(
- llfuse.ROOT_INODE, self.operations.inodes, self.api.config))
+ llfuse.ROOT_INODE, self.operations.inodes, self.api.config, self.args.enable_write))
dir_args[0] = e.inode
for name in self.args.mount_by_id:
and the value referencing a File or Directory object.
"""
- def __init__(self, parent_inode, inodes, apiconfig):
+ def __init__(self, parent_inode, inodes, apiconfig, enable_write):
"""parent_inode is the integer inode number"""
super(Directory, self).__init__()
self.apiconfig = apiconfig
self._entries = {}
self._mtime = time.time()
+ self._enable_write = enable_write
def forward_slash_subst(self):
if not hasattr(self, '_fsns'):
"""
- def __init__(self, parent_inode, inodes, apiconfig, collection):
- super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig)
+ def __init__(self, parent_inode, inodes, apiconfig, enable_write, collection):
+ super(CollectionDirectoryBase, self).__init__(parent_inode, inodes, apiconfig, enable_write)
self.apiconfig = apiconfig
self.collection = collection
item.fuse_entry.dead = False
self._entries[name] = item.fuse_entry
elif isinstance(item, arvados.collection.RichCollectionBase):
- self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, item))
+ self._entries[name] = self.inodes.add_entry(CollectionDirectoryBase(self.inode, self.inodes, self.apiconfig, self._enable_write, item))
self._entries[name].populate(mtime)
else:
- self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime))
+ self._entries[name] = self.inodes.add_entry(FuseArvadosFile(self.inode, item, mtime, self._enable_write))
item.fuse_entry = self._entries[name]
def on_event(self, event, collection, name, item):
self.new_entry(entry, item, self.mtime())
def writable(self):
- return self.collection.writable()
+ return self._enable_write and self.collection.writable()
@use_counter
def flush(self):
+ if not self.writable():
+ return
with llfuse.lock_released:
self.collection.root_collection().save()
@use_counter
@check_update
def create(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.open(name, "w").close()
@use_counter
@check_update
def mkdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.mkdirs(name)
@use_counter
@check_update
def unlink(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.remove(name)
self.flush()
@use_counter
@check_update
def rmdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
with llfuse.lock_released:
self.collection.remove(name)
self.flush()
@use_counter
@check_update
def rename(self, name_old, name_new, src):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
if not isinstance(src, CollectionDirectoryBase):
raise llfuse.FUSEError(errno.EPERM)
class CollectionDirectory(CollectionDirectoryBase):
"""Represents the root of a directory tree representing a collection."""
- def __init__(self, parent_inode, inodes, api, num_retries, collection_record=None, explicit_collection=None):
- super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, None)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, collection_record=None, explicit_collection=None):
+ super(CollectionDirectory, self).__init__(parent_inode, inodes, api.config, enable_write, None)
self.api = api
self.num_retries = num_retries
self.collection_record_file = None
self._mtime = 0
self._manifest_size = 0
if self.collection_locator:
- self._writable = (uuid_pattern.match(self.collection_locator) is not None)
+ self._writable = (uuid_pattern.match(self.collection_locator) is not None) and enable_write
self._updating_lock = threading.Lock()
def same(self, i):
return i['uuid'] == self.collection_locator or i['portable_data_hash'] == self.collection_locator
def writable(self):
- return self.collection.writable() if self.collection is not None else self._writable
+ return self._enable_write and (self.collection.writable() if self.collection is not None else self._writable)
def want_event_subscribe(self):
return (uuid_pattern.match(self.collection_locator) is not None)
def save_new(self):
pass
- def __init__(self, parent_inode, inodes, api_client, num_retries, storage_classes=None):
+ def __init__(self, parent_inode, inodes, api_client, num_retries, enable_write, storage_classes=None):
collection = self.UnsaveableCollection(
api_client=api_client,
keep_client=api_client.keep,
num_retries=num_retries,
storage_classes_desired=storage_classes)
+ # This is always enable_write=True because it never tries to
+ # save to the backend
super(TmpCollectionDirectory, self).__init__(
- parent_inode, inodes, api_client.config, collection)
+ parent_inode, inodes, api_client.config, True, collection)
self.collection_record_file = None
self.populate(self.mtime())
""".lstrip()
- def __init__(self, parent_inode, inodes, api, num_retries, pdh_only=False, storage_classes=None):
- super(MagicDirectory, self).__init__(parent_inode, inodes, api.config)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, pdh_only=False, storage_classes=None):
+ super(MagicDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
self.api = api
self.num_retries = num_retries
self.pdh_only = pdh_only
# If we're the root directory, add an identical by_id subdirectory.
if self.inode == llfuse.ROOT_INODE:
self._entries['by_id'] = self.inodes.add_entry(MagicDirectory(
- self.inode, self.inodes, self.api, self.num_retries, self.pdh_only))
+ self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
+ self.pdh_only))
def __contains__(self, k):
if k in self._entries:
if project[u'items_available'] == 0:
return False
e = self.inodes.add_entry(ProjectDirectory(
- self.inode, self.inodes, self.api, self.num_retries,
+ self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
project[u'items'][0], storage_classes=self.storage_classes))
else:
e = self.inodes.add_entry(CollectionDirectory(
- self.inode, self.inodes, self.api, self.num_retries, k))
+ self.inode, self.inodes, self.api, self.num_retries, self._enable_write, k))
if e.update():
if k not in self._entries:
class TagsDirectory(Directory):
"""A special directory that contains as subdirectories all tags visible to the user."""
- def __init__(self, parent_inode, inodes, api, num_retries, poll_time=60):
- super(TagsDirectory, self).__init__(parent_inode, inodes, api.config)
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, poll_time=60):
+ super(TagsDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
self.api = api
self.num_retries = num_retries
self._poll = True
self.merge(tags['items']+[{"name": n} for n in self._extra],
lambda i: i['name'],
lambda a, i: a.tag == i['name'],
- lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, i['name'], poll=self._poll, poll_time=self._poll_time))
+ lambda i: TagDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
+ i['name'], poll=self._poll, poll_time=self._poll_time))
@use_counter
@check_update
to the user that are tagged with a particular tag.
"""
- def __init__(self, parent_inode, inodes, api, num_retries, tag,
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, tag,
poll=False, poll_time=60):
- super(TagDirectory, self).__init__(parent_inode, inodes, api.config)
+ super(TagDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
self.api = api
self.num_retries = num_retries
self.tag = tag
self.merge(taggedcollections['items'],
lambda i: i['head_uuid'],
lambda a, i: a.collection_locator == i['head_uuid'],
- lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid']))
+ lambda i: CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid']))
class ProjectDirectory(Directory):
"""A special directory that contains the contents of a project."""
- def __init__(self, parent_inode, inodes, api, num_retries, project_object,
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, project_object,
poll=True, poll_time=3, storage_classes=None):
- super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config)
+ super(ProjectDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
self.api = api
self.num_retries = num_retries
self.project_object = project_object
def createDirectory(self, i):
if collection_uuid_pattern.match(i['uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i)
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i)
elif group_uuid_pattern.match(i['uuid']):
- return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i, self._poll, self._poll_time, self.storage_classes)
+ return ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
+ i, self._poll, self._poll_time, self.storage_classes)
elif link_uuid_pattern.match(i['uuid']):
if i['head_kind'] == 'arvados#collection' or portable_data_hash_pattern.match(i['head_uuid']):
- return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, i['head_uuid'])
+ return CollectionDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write, i['head_uuid'])
else:
return None
elif uuid_pattern.match(i['uuid']):
@use_counter
@check_update
def writable(self):
+ if not self._enable_write:
+ return False
with llfuse.lock_released:
if not self._current_user:
self._current_user = self.api.users().current().execute(num_retries=self.num_retries)
@use_counter
@check_update
def mkdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
try:
with llfuse.lock_released:
c = {
@use_counter
@check_update
def rmdir(self, name):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
if name not in self:
raise llfuse.FUSEError(errno.ENOENT)
if not isinstance(self[name], CollectionDirectory):
@use_counter
@check_update
def rename(self, name_old, name_new, src):
+ if not self.writable():
+ raise llfuse.FUSEError(errno.EROFS)
+
if not isinstance(src, ProjectDirectory):
raise llfuse.FUSEError(errno.EPERM)
class SharedDirectory(Directory):
"""A special directory that represents users or groups who have shared projects with me."""
- def __init__(self, parent_inode, inodes, api, num_retries, exclude,
+ def __init__(self, parent_inode, inodes, api, num_retries, enable_write, exclude,
poll=False, poll_time=60, storage_classes=None):
- super(SharedDirectory, self).__init__(parent_inode, inodes, api.config)
+ super(SharedDirectory, self).__init__(parent_inode, inodes, api.config, enable_write)
self.api = api
self.num_retries = num_retries
self.current_user = api.users().current().execute(num_retries=num_retries)
self.merge(contents.items(),
lambda i: i[0],
lambda a, i: a.uuid() == i[1]['uuid'],
- lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
+ lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, self._enable_write,
+ i[1], poll=self._poll, poll_time=self._poll_time, storage_classes=self.storage_classes))
except Exception:
_logger.exception("arv-mount shared dir error")
finally:
class FuseArvadosFile(File):
"""Wraps a ArvadosFile."""
- __slots__ = ('arvfile',)
+ __slots__ = ('arvfile', '_enable_write')
- def __init__(self, parent_inode, arvfile, _mtime):
+ def __init__(self, parent_inode, arvfile, _mtime, enable_write):
super(FuseArvadosFile, self).__init__(parent_inode, _mtime)
self.arvfile = arvfile
+ self._enable_write = enable_write
def size(self):
with llfuse.lock_released:
return False
def writable(self):
- return self.arvfile.writable()
+ return self._enable_write and self.arvfile.writable()
def flush(self):
with llfuse.lock_released:
llfuse.close()
def make_mount(self, root_class, **root_kwargs):
+ enable_write = True
+ if 'enable_write' in root_kwargs:
+ enable_write = root_kwargs.pop('enable_write')
self.operations = fuse.Operations(
os.getuid(), os.getgid(),
api_client=self.api,
- enable_write=True)
+ enable_write=enable_write)
self.operations.inodes.add_entry(root_class(
- llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, **root_kwargs))
+ llfuse.ROOT_INODE, self.operations.inodes, self.api, 0, enable_write, **root_kwargs))
llfuse.init(self.operations, self.mounttmp, [])
self.llfuse_thread = threading.Thread(None, lambda: self._llfuse_main())
self.llfuse_thread.daemon = True
class SanitizeFilenameTest(MountTestBase):
def test_sanitize_filename(self):
- pdir = fuse.ProjectDirectory(1, {}, self.api, 0, project_object=self.api.users().current().execute())
+ pdir = fuse.ProjectDirectory(1, {}, self.api, 0, False, project_object=self.api.users().current().execute())
acceptable = [
"foo.txt",
".foo",
@staticmethod
def _test_collection_custom_storage_classes(self, coll):
self.assertEqual(storage_classes_desired(coll), ['foo'])
+
+def _readonlyCollectionTestHelper(mounttmp):
+ f = open(os.path.join(mounttmp, 'thing1.txt'), 'rt')
+ # Testing that close() doesn't raise an error.
+ f.close()
+
+class ReadonlyCollectionTest(MountTestBase):
+ def setUp(self):
+ super(ReadonlyCollectionTest, self).setUp()
+ cw = arvados.collection.Collection()
+ with cw.open('thing1.txt', 'wt') as f:
+ f.write("data 1")
+ cw.save_new(owner_uuid=run_test_server.fixture("groups")["aproject"]["uuid"])
+ self.testcollection = cw.api_response()
+
+ def runTest(self):
+ settings = arvados.config.settings().copy()
+ settings["ARVADOS_API_TOKEN"] = run_test_server.fixture("api_client_authorizations")["project_viewer"]["api_token"]
+ self.api = arvados.safeapi.ThreadSafeApiCache(settings)
+ self.make_mount(fuse.CollectionDirectory, collection_record=self.testcollection, enable_write=False)
+
+ self.pool.apply(_readonlyCollectionTestHelper, (self.mounttmp,))
_ "net/http/pprof"
"os"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
logger := ctxlog.FromContext(context.Background())
var options RunOptions
- flags := flag.NewFlagSet(prog, flag.ExitOnError)
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
flags.BoolVar(&options.Once, "once", false,
"balance once and then exit")
flags.BoolVar(&options.CommitPulls, "commit-pulls", false,
"send trash requests (delete unreferenced old blocks, and excess replicas of overreplicated blocks)")
flags.BoolVar(&options.CommitConfirmedFields, "commit-confirmed-fields", true,
"update collection fields (replicas_confirmed, storage_classes_confirmed, etc.)")
- flags.Bool("version", false, "Write version information to stdout and exit 0")
dumpFlag := flags.Bool("dump", false, "dump details for each block to stdout")
pprofAddr := flags.String("pprof", "", "serve Go profile data at `[addr]:port`")
+ // "show version" is implemented by service.Command, so we
+ // don't need the var here -- we just need the -version flag
+ // to pass flags.Parse().
+ flags.Bool("version", false, "Write version information to stdout and exit 0")
if *pprofAddr != "" {
go func() {
loader.SetupFlags(flags)
munged := loader.MungeLegacyConfigArgs(logger, args, "-legacy-keepbalance-config")
- flags.Parse(munged)
+ if ok, code := cmd.ParseFlags(flags, prog, munged, "", stderr); !ok {
+ return code
+ }
if *dumpFlag {
dumper := logrus.New()
}
flags.Visit(func(f *flag.Flag) {
if !dropFlag[f.Name] {
- args = append(args, "-"+f.Name, f.Value.String())
+ args = append(args, "-"+f.Name+"="+f.Value.String())
}
})
runCommand("keep-balance", []string{"-version"}, nil, &stdout, &stderr)
c.Check(stderr.String(), check.Equals, "")
c.Log(stdout.String())
+ c.Check(stdout.String(), check.Matches, `keep-balance.*\(go1.*\)\n`)
}
func (s *mainSuite) TestHTTPServer(c *check.C) {
"mime"
"os"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/coreos/go-systemd/daemon"
})
}
-func configure(logger log.FieldLogger, args []string) *Config {
- flags := flag.NewFlagSet(args[0], flag.ExitOnError)
+func configure(logger log.FieldLogger, args []string) (*Config, error) {
+ flags := flag.NewFlagSet(args[0], flag.ContinueOnError)
loader := config.NewLoader(os.Stdin, logger)
loader.SetupFlags(flags)
getVersion := flags.Bool("version", false,
"print version information and exit.")
+ prog := args[0]
args = loader.MungeLegacyConfigArgs(logger, args[1:], "-legacy-keepweb-config")
- flags.Parse(args)
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("keep-web %s\n", version)
- return nil
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
+ fmt.Printf("%s %s\n", args[0], version)
+ return nil, nil
}
arvCfg, err := loader.Load()
if err != nil {
- log.Fatal(err)
+ return nil, err
}
cfg := newConfig(logger, arvCfg)
if *dumpConfig {
out, err := yaml.Marshal(cfg)
if err != nil {
- log.Fatal(err)
+ return nil, err
}
_, err = os.Stdout.Write(out)
- if err != nil {
- log.Fatal(err)
- }
- return nil
+ return nil, err
}
- return cfg
+ return cfg, nil
}
func main() {
logger := log.New()
- cfg := configure(logger, os.Args)
- if cfg == nil {
+ cfg, err := configure(logger, os.Args)
+ if err != nil {
+ logger.Fatal(err)
+ } else if cfg == nil {
return
}
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
func configure(logger log.FieldLogger, args []string) (*arvados.Cluster, error) {
- flags := flag.NewFlagSet(args[0], flag.ExitOnError)
+ prog := args[0]
+ flags := flag.NewFlagSet(prog, flag.ContinueOnError)
dumpConfig := flags.Bool("dump-config", false, "write current configuration to stdout and exit")
getVersion := flags.Bool("version", false, "Print version information and exit.")
loader := config.NewLoader(os.Stdin, logger)
loader.SetupFlags(flags)
-
args = loader.MungeLegacyConfigArgs(logger, args[1:], "-legacy-keepproxy-config")
- flags.Parse(args)
- // Print version information if requested
- if *getVersion {
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
fmt.Printf("keepproxy %s\n", version)
return nil, nil
}
"os"
"sync"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
)
var (
- version = "dev"
Command = service.Command(arvados.ServiceNameKeepstore, newHandlerOrErrorHandler)
)
func runCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- args, ok := convertKeepstoreFlagsToServiceFlags(args, ctxlog.FromContext(context.Background()))
+ args, ok, code := convertKeepstoreFlagsToServiceFlags(prog, args, ctxlog.FromContext(context.Background()), stderr)
if !ok {
- return 2
+ return code
}
return Command.RunCommand(prog, args, stdin, stdout, stderr)
}
// Parse keepstore command line flags, and return equivalent
-// service.Command flags. The second return value ("ok") is true if
-// all provided flags were successfully converted.
-func convertKeepstoreFlagsToServiceFlags(args []string, lgr logrus.FieldLogger) ([]string, bool) {
+// service.Command flags. If the second return value ("ok") is false,
+// the program should exit, and the third return value is a suitable
+// exit code.
+func convertKeepstoreFlagsToServiceFlags(prog string, args []string, lgr logrus.FieldLogger, stderr io.Writer) ([]string, bool, int) {
flags := flag.NewFlagSet("", flag.ContinueOnError)
flags.String("listen", "", "Services.Keepstore.InternalURLs")
flags.Int("max-buffers", 0, "API.MaxKeepBlobBuffers")
flags.String("config", "", "")
flags.String("legacy-keepstore-config", "", "")
- err := flags.Parse(args)
- if err == flag.ErrHelp {
- return []string{"-help"}, true
- } else if err != nil {
- return nil, false
+ if ok, code := cmd.ParseFlags(flags, prog, args, "", stderr); !ok {
+ return nil, false, code
}
args = nil
}
})
if !ok {
- return nil, false
+ return nil, false, 2
}
- flags = flag.NewFlagSet("", flag.ExitOnError)
+ flags = flag.NewFlagSet("", flag.ContinueOnError)
loader := config.NewLoader(nil, lgr)
loader.SetupFlags(flags)
- return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true
+ return loader.MungeLegacyConfigArgs(lgr, args, "-legacy-keepstore-config"), true, 0
}
type handler struct {
return errors.New("no volumes configured")
}
- h.Logger.Printf("keepstore %s starting, pid %d", version, os.Getpid())
+ h.Logger.Printf("keepstore %s starting, pid %d", cmd.Version.String(), os.Getpid())
// Start a round-robin VolumeManager with the configured volumes.
vm, err := makeRRVolumeManager(h.Logger, h.Cluster, serviceURL, newVolumeMetricsVecs(reg))
"sync/atomic"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
// populate the given NodeStatus struct with current values.
func (rtr *router) readNodeStatus(st *NodeStatus) {
- st.Version = version
+ st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
vols := rtr.volmgr.AllReadable()
if cap(st.Volumes) < len(vols) {
st.Volumes = make([]*volumeStatusEnt, len(vols))
// e4de7a2810f5554cd39b36d8ddb132ff+67108864 1388701136
//
func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
- var lastErr error
rootdir, err := v.os.Open(v.Root)
if err != nil {
return err
}
- defer rootdir.Close()
v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
- for {
- names, err := rootdir.Readdirnames(1)
- if err == io.EOF {
- return lastErr
- } else if err != nil {
- return err
- }
- if !strings.HasPrefix(names[0], prefix) && !strings.HasPrefix(prefix, names[0]) {
+ subdirs, err := rootdir.Readdirnames(-1)
+ rootdir.Close()
+ if err != nil {
+ return err
+ }
+ for _, subdir := range subdirs {
+ if !strings.HasPrefix(subdir, prefix) && !strings.HasPrefix(prefix, subdir) {
// prefix excludes all blocks stored in this dir
continue
}
- if !blockDirRe.MatchString(names[0]) {
+ if !blockDirRe.MatchString(subdir) {
continue
}
- blockdirpath := filepath.Join(v.Root, names[0])
+ blockdirpath := filepath.Join(v.Root, subdir)
blockdir, err := v.os.Open(blockdirpath)
if err != nil {
v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
- lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
- continue
+ return fmt.Errorf("error reading %q: %s", blockdirpath, err)
}
v.os.stats.TickOps("readdir")
v.os.stats.Tick(&v.os.stats.ReaddirOps)
- for {
- fileInfo, err := blockdir.Readdir(1)
- if err == io.EOF {
- break
+ // ReadDir() (compared to Readdir(), which returns
+ // FileInfo structs) helps complete the sequence of
+ // readdirent calls as quickly as possible, reducing
+ // the likelihood of NFS EBADCOOKIE (523) errors.
+ dirents, err := blockdir.ReadDir(-1)
+ blockdir.Close()
+ if err != nil {
+ v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+ return fmt.Errorf("error reading %q: %s", blockdirpath, err)
+ }
+ for _, dirent := range dirents {
+ fileInfo, err := dirent.Info()
+ if os.IsNotExist(err) {
+ // File disappeared between ReadDir() and now
+ continue
} else if err != nil {
- v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
- lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
- break
+ v.logger.WithError(err).Errorf("error getting FileInfo for %q in %q", dirent.Name(), blockdirpath)
+ return err
}
- name := fileInfo[0].Name()
+ name := fileInfo.Name()
if !strings.HasPrefix(name, prefix) {
continue
}
}
_, err = fmt.Fprint(w,
name,
- "+", fileInfo[0].Size(),
- " ", fileInfo[0].ModTime().UnixNano(),
+ "+", fileInfo.Size(),
+ " ", fileInfo.ModTime().UnixNano(),
"\n")
if err != nil {
- blockdir.Close()
return fmt.Errorf("error writing: %s", err)
}
}
- blockdir.Close()
}
+ return nil
}
// Trash trashes the block data from the unix storage
"errors"
"flag"
"fmt"
+ "io"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
)
var version = "dev"
func main() {
- err := doMain(os.Args[1:])
- if err != nil {
- log.Fatalf("%v", err)
- }
+ os.Exit(doMain(os.Args[1:], os.Stderr))
}
-func doMain(args []string) error {
+func doMain(args []string, stderr io.Writer) int {
flags := flag.NewFlagSet("keep-block-check", flag.ExitOnError)
configFile := flags.String(
false,
"Print version information and exit.")
- // Parse args; omit the first arg which is the command name
- flags.Parse(args)
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("keep-block-check %s\n", version)
- os.Exit(0)
+ if ok, code := cmd.ParseFlags(flags, os.Args[0], args, "", stderr); !ok {
+ return code
+ } else if *getVersion {
+ fmt.Printf("%s %s\n", os.Args[0], version)
+ return 0
}
config, blobSigningKey, err := loadConfig(*configFile)
if err != nil {
- return fmt.Errorf("Error loading configuration from file: %s", err.Error())
+ fmt.Fprintf(stderr, "Error loading configuration from file: %s\n", err)
+ return 1
}
// get list of block locators to be checked
blockLocators, err := getBlockLocators(*locatorFile, *prefix)
if err != nil {
- return fmt.Errorf("Error reading block hashes to be checked from file: %s", err.Error())
+ fmt.Fprintf(stderr, "Error reading block hashes to be checked from file: %s\n", err)
+ return 1
}
// setup keepclient
kc, blobSignatureTTL, err := setupKeepClient(config, *keepServicesJSON, *blobSignatureTTLFlag)
if err != nil {
- return fmt.Errorf("Error configuring keepclient: %s", err.Error())
+ fmt.Fprintf(stderr, "Error configuring keepclient: %s\n", err)
+ return 1
+ }
+
+ err = performKeepBlockCheck(kc, blobSignatureTTL, blobSigningKey, blockLocators, *verbose)
+ if err != nil {
+ fmt.Fprintln(stderr, err)
+ return 1
}
- return performKeepBlockCheck(kc, blobSignatureTTL, blobSigningKey, blockLocators, *verbose)
+ return 0
}
type apiConfig struct {
func (s *DoMainTestSuite) Test_doMain_WithNoConfig(c *C) {
args := []string{"-prefix", "a"}
- err := doMain(args)
- c.Check(err, NotNil)
- c.Assert(strings.Contains(err.Error(), "config file not specified"), Equals, true)
+ var stderr bytes.Buffer
+ code := doMain(args, &stderr)
+ c.Check(code, Equals, 1)
+ c.Check(stderr.String(), Matches, ".*config file not specified\n")
}
func (s *DoMainTestSuite) Test_doMain_WithNoSuchConfigFile(c *C) {
args := []string{"-config", "no-such-file"}
- err := doMain(args)
- c.Check(err, NotNil)
- c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
+ var stderr bytes.Buffer
+ code := doMain(args, &stderr)
+ c.Check(code, Equals, 1)
+ c.Check(stderr.String(), Matches, ".*no such file or directory\n")
}
func (s *DoMainTestSuite) Test_doMain_WithNoBlockHashFile(c *C) {
defer arvadostest.StopKeep(2)
args := []string{"-config", config}
- err := doMain(args)
- c.Assert(strings.Contains(err.Error(), "block-hash-file not specified"), Equals, true)
+ var stderr bytes.Buffer
+ code := doMain(args, &stderr)
+ c.Check(code, Equals, 1)
+ c.Check(stderr.String(), Matches, ".*block-hash-file not specified\n")
}
func (s *DoMainTestSuite) Test_doMain_WithNoSuchBlockHashFile(c *C) {
defer arvadostest.StopKeep(2)
args := []string{"-config", config, "-block-hash-file", "no-such-file"}
- err := doMain(args)
- c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
+ var stderr bytes.Buffer
+ code := doMain(args, &stderr)
+ c.Check(code, Equals, 1)
+ c.Check(stderr.String(), Matches, ".*no such file or directory\n")
}
func (s *DoMainTestSuite) Test_doMain(c *C) {
defer os.Remove(locatorFile)
args := []string{"-config", config, "-block-hash-file", locatorFile, "-v"}
- err := doMain(args)
- c.Check(err, NotNil)
- c.Assert(err.Error(), Equals, "Block verification failed for 2 out of 2 blocks with matching prefix")
+ var stderr bytes.Buffer
+ code := doMain(args, &stderr)
+ c.Check(code, Equals, 1)
+ c.Assert(stderr.String(), Matches, "Block verification failed for 2 out of 2 blocks with matching prefix\n")
checkErrorLog(c, []string{TestHash, TestHash2}, "Error verifying block", "Block not found")
c.Assert(strings.Contains(logBuffer.String(), "Verifying block 1 of 2"), Equals, true)
}
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
}
func main() {
- flag.Parse()
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("keep-exercise %s\n", version)
- os.Exit(0)
+ if ok, code := cmd.ParseFlags(flag.CommandLine, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
+ fmt.Printf("%s %s\n", os.Args[0], version)
+ return
}
lgr := log.New(os.Stderr, "", log.LstdFlags)
"strings"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
)
false,
"Print version information and exit.")
- // Parse args; omit the first arg which is the command name
- flags.Parse(os.Args[1:])
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("keep-rsync %s\n", version)
+ if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
+ fmt.Printf("%s %s\n", os.Args[0], version)
os.Exit(0)
}
"os"
"strings"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
)
* 1st: Group name
* 2nd: User identifier
* 3rd (Optional): User permission on the group: can_read, can_write or can_manage. (Default: can_write)`
- fmt.Fprintf(os.Stderr, "%s\n\n", usageStr)
- fmt.Fprintf(os.Stderr, "Usage:\n%s [OPTIONS] <input-file.csv>\n\n", os.Args[0])
- fmt.Fprintf(os.Stderr, "Options:\n")
+ fmt.Fprintf(flags.Output(), "%s\n\n", usageStr)
+ fmt.Fprintf(flags.Output(), "Usage:\n%s [OPTIONS] <input-file.csv>\n\n", os.Args[0])
+ fmt.Fprintf(flags.Output(), "Options:\n")
flags.PrintDefaults()
}
"",
"Use given group UUID as a parent for the remote groups. Should be owned by the system user. If not specified, a group named '"+config.ParentGroupName+"' will be used (and created if nonexistant).")
- // Parse args; omit the first arg which is the command name
- flags.Parse(os.Args[1:])
-
- // Print version information if requested
- if *getVersion {
+ if ok, code := cmd.ParseFlags(flags, os.Args[0], os.Args[1:], "input-file.csv", os.Stderr); !ok {
+ os.Exit(code)
+ } else if *getVersion {
fmt.Printf("%s %s\n", os.Args[0], version)
os.Exit(0)
}