<div class="releasenotes">
</notextile>
-h2(#main). development main (as of 2021-09-07)
+h2(#main). development main (as of 2021-10-27)
-"Upgrading from 2.2.0":#v2_2_0
+"previous: Upgrading from 2.3.0":#v2_3_0
+
+h3. Dedicated keepstore process for each container
+
+When Arvados runs a container via @arvados-dispatch-cloud@, the @crunch-run@ supervisor process now brings up its own keepstore server to handle I/O for mounted collections, outputs, and logs. With the default configuration, the keepstore process allocates one 64 MiB block buffer per VCPU requested by the container. For most workloads this will increase throughput, reduce total network traffic, and make it possible to run more containers at once without provisioning additional keepstore nodes to handle the I/O load.
+* If you have containers that can effectively handle multiple I/O threads per VCPU, consider increasing the @Containers.LocalKeepBlobBuffersPerVCPU@ value.
+* If you already have a robust permanent keepstore infrastructure, you can set @Containers.LocalKeepBlobBuffersPerVCPU@ to 0 to disable this feature and preserve the previous behavior of sending container I/O traffic to your separately provisioned keepstore servers.
+* This feature is enabled only if no volumes use @AccessViaHosts@, and no volumes have underlying @Replication@ less than @Collections.DefaultReplication@. If the feature is configured but cannot be enabled due to an incompatible volume configuration, this will be noted in the @crunch-run.txt@ file in the container log.
+
+h2(#v2_3_0). v2.3.0 (2021-10-27)
+
+"previous: Upgrading to 2.2.0":#v2_2_0
h3. Ubuntu 18.04 packages for arvados-api-server and arvados-workbench now conflict with ruby-bundler
h2(#v2_2_0). v2.2.0 (2021-06-03)
-"Upgrading from 2.1.0":#v2_1_0
+"previous: Upgrading to 2.1.0":#v2_1_0
h3. New spelling of S3 credential configs
h2(#v2_1_0). v2.1.0 (2020-10-13)
-"Upgrading from 2.0.0":#v2_0_0
+"previous: Upgrading to 2.0.0":#v2_0_0
h3. LoginCluster conflicts with other Login providers
h2(#v2_0_0). v2.0.0 (2020-02-07)
-"Upgrading from 1.4":#v1_4_1
+"previous: Upgrading to 1.4.1":#v1_4_1
Arvados 2.0 is a major upgrade, with many changes. Please read these upgrade notes carefully before you begin.
h2(#v1_4_1). v1.4.1 (2019-09-20)
-"Upgrading from 1.4.0":#v1_4_0
+"previous: Upgrading to 1.4.0":#v1_4_0
h3. Centos7 Python 3 dependency upgraded to rh-python36
h2(#v1_4_0). v1.4.0 (2019-06-05)
-"Upgrading from 1.3.3":#v1_3_3
+"previous: Upgrading to 1.3.3":#v1_3_3
h3. Populating the new file_count and file_size_total columns on the collections table
h2(#v1_3_3). v1.3.3 (2019-05-14)
-"Upgrading from 1.3.0":#v1_3_0
+"previous: Upgrading to 1.3.0":#v1_3_0
This release corrects a potential data loss issue, if you are running Arvados 1.3.0 or 1.3.1 we strongly recommended disabling @keep-balance@ until you can upgrade to 1.3.3 or 1.4.0. With keep-balance disabled, there is no chance of data loss.
h2(#v1_3_0). v1.3.0 (2018-12-05)
-"Upgrading from 1.2":#v1_2_0
+"previous: Upgrading to 1.2":#v1_2_0
This release includes several database migrations, which will be executed automatically as part of the API server upgrade. On large Arvados installations, these migrations will take a while. We've seen the upgrade take 30 minutes or more on installations with a lot of collections.
h2(#v1_2_0). v1.2.0 (2018-09-05)
-"Upgrading from 1.1.2 or 1.1.3":#v1_1_2
+"previous: Upgrading to 1.1.2 or 1.1.3":#v1_1_2
h3. Regenerate Postgres table statistics
h2(#v1_1_4). v1.1.4 (2018-04-10)
-"Upgrading from 1.1.3":#v1_1_3
+"previous: Upgrading to 1.1.3":#v1_1_3
h3. arvados-cwl-runner regressions (2018-04-05)
h2(#v1_1_2). v1.1.2 (2017-12-22)
-"Upgrading from 1.1.0 or 1.1.1":#v1_1_0
+"previous: Upgrading to 1.1.0 or 1.1.1":#v1_1_0
h3. The minimum version for Postgres is now 9.4 (2017-12-08)
# Container runtime: "docker" (default) or "singularity"
RuntimeEngine: docker
+ # When running a container, run a dedicated keepstore process,
+ # using the specified number of 64 MiB memory buffers per
+ # allocated CPU core (VCPUs in the container's runtime
+ # constraints). The dedicated keepstore handles I/O for
+ # collections mounted in the container, as well as saving
+ # container logs.
+ #
+ # A zero value disables this feature.
+ #
+ # In order for this feature to be activated, no volume may use
+ # AccessViaHosts, and each volume must have Replication higher
+ # than Collections.DefaultReplication. If these requirements are
+ # not satisfied, the feature is disabled automatically
+ # regardless of the value given here.
+ #
+ # Note that when this configuration is enabled, the entire
+ # cluster configuration file, including the system root token,
+ # is copied to the worker node and held in memory for the
+ # duration of the container.
+ LocalKeepBlobBuffersPerVCPU: 1
+
+ # When running a dedicated keepstore process for a container
+ # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+ # messages to keepstore.txt in the container's log collection.
+ #
+ # These log messages can reveal some volume configuration
+ # details, error messages from the cloud storage provider, etc.,
+ # which are not otherwise visible to users.
+ #
+ # Accepted values:
+ # * "none" -- no keepstore.txt file
+ # * "all" -- all logs, including request and response lines
+ # * "errors" -- all logs except "response" logs with 2xx
+ # response codes and "request" logs
+ LocalKeepLogsToContainerLog: none
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
"Containers.JobsAPI": true,
"Containers.JobsAPI.Enable": true,
"Containers.JobsAPI.GitInternalDir": false,
+ "Containers.LocalKeepBlobBuffersPerVCPU": false,
+ "Containers.LocalKeepLogsToContainerLog": false,
"Containers.Logging": false,
"Containers.LogReuseDecisions": false,
"Containers.LSF": false,
# Container runtime: "docker" (default) or "singularity"
RuntimeEngine: docker
+ # When running a container, run a dedicated keepstore process,
+ # using the specified number of 64 MiB memory buffers per
+ # allocated CPU core (VCPUs in the container's runtime
+ # constraints). The dedicated keepstore handles I/O for
+ # collections mounted in the container, as well as saving
+ # container logs.
+ #
+ # A zero value disables this feature.
+ #
+ # In order for this feature to be activated, no volume may use
+ # AccessViaHosts, and each volume must have Replication higher
+ # than Collections.DefaultReplication. If these requirements are
+ # not satisfied, the feature is disabled automatically
+ # regardless of the value given here.
+ #
+ # Note that when this configuration is enabled, the entire
+ # cluster configuration file, including the system root token,
+ # is copied to the worker node and held in memory for the
+ # duration of the container.
+ LocalKeepBlobBuffersPerVCPU: 1
+
+ # When running a dedicated keepstore process for a container
+ # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+ # messages to keepstore.txt in the container's log collection.
+ #
+ # These log messages can reveal some volume configuration
+ # details, error messages from the cloud storage provider, etc.,
+ # which are not otherwise visible to users.
+ #
+ # Accepted values:
+ # * "none" -- no keepstore.txt file
+ # * "all" -- all logs, including request and response lines
+ # * "errors" -- all logs except "response" logs with 2xx
+ # response codes and "request" logs
+ LocalKeepLogsToContainerLog: none
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken),
ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey),
checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+ ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
ldr.checkEmptyKeepstores(cc),
ldr.checkUnlistedKeepstores(cc),
ldr.checkStorageClasses(cc),
return nil
}
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+ for _, s := range accepted {
+ if s == value {
+ return nil
+ }
+ }
+ return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
cluster:
for id, cc := range cfg.Clusters {
//
// Stdout and stderr in the child process are sent to the systemd
// journal using the systemd-cat program.
-func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
- return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
}
-func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
lockfile, err := func() (*os.File, error) {
// We must hold the dir-level lock between
// opening/creating the lockfile and acquiring LOCK_EX
// invoked as "/path/to/crunch-run"
execargs = append([]string{prog}, execargs...)
}
- execargs = append([]string{
- // Here, if the inner systemd-cat can't exec
- // crunch-run, it writes an error message to stderr,
- // and the outer systemd-cat writes it to the journal
- // where the operator has a chance to discover it. (If
- // we only used one systemd-cat command, it would be
- // up to us to report the error -- but we are going to
- // detach and exit, not wait for something to appear
- // on stderr.) Note these systemd-cat calls don't
- // result in additional processes -- they just connect
- // stderr/stdout to sockets and call exec().
- "systemd-cat", "--identifier=crunch-run",
- "systemd-cat", "--identifier=crunch-run",
- }, execargs...)
+ if _, err := exec.LookPath("systemd-cat"); err == nil {
+ execargs = append([]string{
+ // Here, if the inner systemd-cat can't exec
+ // crunch-run, it writes an error message to
+ // stderr, and the outer systemd-cat writes it
+ // to the journal where the operator has a
+ // chance to discover it. (If we only used one
+ // systemd-cat command, it would be up to us
+ // to report the error -- but we are going to
+ // detach and exit, not wait for something to
+ // appear on stderr.) Note these systemd-cat
+ // calls don't result in additional processes
+ // -- they just connect stderr/stdout to
+ // sockets and call exec().
+ "systemd-cat", "--identifier=crunch-run",
+ "systemd-cat", "--identifier=crunch-run",
+ }, execargs...)
+ }
cmd := exec.Command(execargs[0], execargs[1:]...)
// Child inherits lockfile.
// from parent (sshd) while sending lockfile content to
// caller.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ // We need to manage our own OS pipe here to ensure the child
+ // process reads all of our stdin pipe before we return.
+ piper, pipew, err := os.Pipe()
+ if err != nil {
+ return err
+ }
+ defer pipew.Close()
+ cmd.Stdin = piper
err = cmd.Start()
if err != nil {
return fmt.Errorf("exec %s: %s", cmd.Path, err)
}
+ _, err = io.Copy(pipew, stdin)
+ if err != nil {
+ return err
+ }
+ err = pipew.Close()
+ if err != nil {
+ return err
+ }
w := io.MultiWriter(stdout, lockfile)
return json.NewEncoder(w).Encode(procinfo{
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+type bufThenWrite struct {
+ buf bytes.Buffer
+ w io.Writer
+ mtx sync.Mutex
+}
+
+func (btw *bufThenWrite) SetWriter(w io.Writer) error {
+ btw.mtx.Lock()
+ defer btw.mtx.Unlock()
+ btw.w = w
+ _, err := io.Copy(w, &btw.buf)
+ return err
+}
+
+func (btw *bufThenWrite) Write(p []byte) (int, error) {
+ btw.mtx.Lock()
+ defer btw.mtx.Unlock()
+ if btw.w == nil {
+ btw.w = &btw.buf
+ }
+ return btw.w.Write(p)
+}
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"flag"
"io"
"io/ioutil"
"log"
+ "net"
+ "net/http"
"os"
"os/exec"
"os/signal"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
- "golang.org/x/net/context"
)
type command struct{}
var Command = command{}
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+ Env map[string]string
+ KeepBuffers int
+ Cluster *arvados.Cluster
+}
+
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
finalState string
parentTemp string
+ keepstoreLogger io.WriteCloser
+ keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
hoststatLogger io.WriteCloser
runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
}()
+ if runner.keepstoreLogger != nil {
+ // Flush any buffered logs from our local keepstore
+ // process. Discard anything logged after this point
+ // -- it won't end up in the log collection, so
+ // there's no point writing it to the collectionfs.
+ runner.keepstoreLogbuf.SetWriter(io.Discard)
+ runner.keepstoreLogger.Close()
+ runner.keepstoreLogger = nil
+ }
+
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// we must be closing the re-opened log, which won't
// -- it exists only to send logs to other channels.
return nil
}
+
saved, err := runner.saveLogCollection(true)
if err != nil {
return fmt.Errorf("error saving log collection: %s", err)
}
func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ log := log.New(stderr, "", 0)
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
- stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+ stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
return 1
}
- if *stdinEnv && !ignoreDetachFlag {
- // Load env vars on stdin if asked (but not in a
- // detached child process, in which case stdin is
- // /dev/null).
- err := loadEnv(os.Stdin)
- if err != nil {
- log.Print(err)
- return 1
- }
- }
-
containerUUID := flags.Arg(0)
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+ return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
case *kill >= 0:
return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
return ListProcesses(os.Stdout, os.Stderr)
}
- if containerUUID == "" {
+ if len(containerUUID) != 27 {
log.Printf("usage: %s [options] UUID", prog)
return 1
}
+ var conf ConfigData
+ if *stdinConfig {
+ err := json.NewDecoder(stdin).Decode(&conf)
+ if err != nil {
+ log.Printf("decode stdin: %s", err)
+ return 1
+ }
+ for k, v := range conf.Env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Printf("setenv(%q): %s", k, err)
+ return 1
+ }
+ }
+ if conf.Cluster != nil {
+ // ClusterID is missing from the JSON
+ // representation, but we need it to generate
+ // a valid config file for keepstore, so we
+ // fill it using the container UUID prefix.
+ conf.Cluster.ClusterID = containerUUID[:5]
+ }
+ }
+
log.Printf("crunch-run %s started", cmd.Version.String())
time.Sleep(*sleep)
arvadosclient.CertFiles = []string{*caCertsPath}
}
+ var keepstoreLogbuf bufThenWrite
+ keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if keepstore != nil {
+ defer keepstore.Process.Kill()
+ }
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("%s: %v", containerUUID, err)
}
api.Retries = 8
- kc, kcerr := keepclient.MakeKeepClient(api)
- if kcerr != nil {
- log.Printf("%s: %v", containerUUID, kcerr)
+ kc, err := keepclient.MakeKeepClient(api)
+ if err != nil {
+ log.Printf("%s: %v", containerUUID, err)
return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
return 1
}
+ if keepstore == nil {
+ // Log explanation (if any) for why we're not running
+ // a local keepstore.
+ var buf bytes.Buffer
+ keepstoreLogbuf.SetWriter(&buf)
+ if buf.Len() > 0 {
+ cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+ }
+ } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ keepstoreLogbuf.SetWriter(io.Discard)
+ } else {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ logwriter, err := cr.NewLogWriter("keepstore")
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+ var writer io.WriteCloser = cr.keepstoreLogger
+ if logWhat == "errors" {
+ writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+ } else if logWhat != "all" {
+ // should have been caught earlier by
+ // dispatcher's config loader
+ log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+ return 1
+ }
+ err = keepstoreLogbuf.SetWriter(writer)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogbuf = &keepstoreLogbuf
+ }
+
switch *runtimeEngine {
case "docker":
cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
return 0
}
-func loadEnv(rdr io.Reader) error {
- buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+ if configData.Cluster == nil || configData.KeepBuffers < 1 {
+ return nil, nil
+ }
+ for uuid, vol := range configData.Cluster.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+ return nil, nil
+ }
+ if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+ return nil, nil
+ }
+ }
+
+ // Rather than have an alternate way to tell keepstore how
+ // many buffers to use when starting it this way, we just
+ // modify the cluster configuration that we feed it on stdin.
+ configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ return nil, err
+ }
+ _, port, err := net.SplitHostPort(ln.Addr().String())
+ if err != nil {
+ ln.Close()
+ return nil, err
+ }
+ ln.Close()
+ url := "http://localhost:" + port
+
+ fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+ var confJSON bytes.Buffer
+ err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ configData.Cluster.ClusterID: *configData.Cluster,
+ },
+ })
if err != nil {
- return fmt.Errorf("read stdin: %s", err)
+ return nil, err
}
- var env map[string]string
- err = json.Unmarshal(buf, &env)
+ cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+ if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+ // If we're a 'go test' process, running
+ // /proc/self/exe would start the test suite in a
+ // child process, which is not what we want.
+ cmd.Path, _ = exec.LookPath("go")
+ cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+ cmd.Env = os.Environ()
+ }
+ cmd.Stdin = &confJSON
+ cmd.Stdout = logbuf
+ cmd.Stderr = logbuf
+ cmd.Env = append(cmd.Env,
+ "GOGC=10",
+ "ARVADOS_SERVICE_INTERNAL_URL="+url)
+ err = cmd.Start()
if err != nil {
- return fmt.Errorf("decode stdin: %s", err)
+ return nil, fmt.Errorf("error starting keepstore process: %w", err)
}
- for k, v := range env {
- err = os.Setenv(k, v)
+ cmdExited := false
+ go func() {
+ cmd.Wait()
+ cmdExited = true
+ }()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+ defer cancel()
+ poll := time.NewTicker(time.Second / 10)
+ defer poll.Stop()
+ client := http.Client{}
+ for range poll.C {
+ testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+ testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
if err != nil {
- return fmt.Errorf("setenv(%q): %s", k, err)
+ return nil, err
+ }
+ resp, err := client.Do(testReq)
+ if err == nil {
+ resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ break
+ }
+ }
+ if cmdExited {
+ return nil, fmt.Errorf("keepstore child process exited")
+ }
+ if ctx.Err() != nil {
+ return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
}
}
- return nil
+ os.Setenv("ARVADOS_KEEP_SERVICES", url)
+ return cmd, nil
}
import (
"bytes"
+ "encoding/json"
"fmt"
"io"
"io/ioutil"
"os/exec"
"strings"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
. "gopkg.in/check.v1"
)
client *arvados.Client
ac *arvadosclient.ArvadosClient
kc *keepclient.KeepClient
+
+ logCollection arvados.Collection
+ outputCollection arvados.Collection
}
func (s *integrationSuite) SetUpSuite(c *C) {
out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
imageUUID := strings.TrimSpace(string(out))
c.Logf("image uuid %s", imageUUID)
- c.Assert(err, IsNil)
+ if !c.Check(err, IsNil) {
+ if err, ok := err.(*exec.ExitError); ok {
+ c.Logf("%s", err.Stderr)
+ }
+ c.Fail()
+ }
err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
c.Assert(err, IsNil)
c.Logf("image pdh %s", s.image.PortableDataHash)
}
func (s *integrationSuite) TearDownSuite(c *C) {
+ os.Unsetenv("ARVADOS_KEEP_SERVICES")
if s.client == nil {
// didn't set up
return
}
func (s *integrationSuite) SetUpTest(c *C) {
+ os.Unsetenv("ARVADOS_KEEP_SERVICES")
s.engine = "docker"
s.stdin = bytes.Buffer{}
s.stdout = bytes.Buffer{}
s.stderr = bytes.Buffer{}
+ s.logCollection = arvados.Collection{}
+ s.outputCollection = arvados.Collection{}
s.cr = arvados.ContainerRequest{
Priority: 1,
State: "Committed",
s.testRunTrivialContainer(c)
}
+func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
+ for _, trial := range []struct {
+ logConfig string
+ matchGetReq Checker
+ matchPutReq Checker
+ matchStartupMessage Checker
+ }{
+ {"none", Not(Matches), Not(Matches), Not(Matches)},
+ {"all", Matches, Matches, Matches},
+ {"errors", Not(Matches), Not(Matches), Matches},
+ } {
+ c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
+ s.SetUpTest(c)
+
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, IsNil)
+ for uuid, volume := range cluster.Volumes {
+ volume.AccessViaHosts = nil
+ volume.Replication = 2
+ cluster.Volumes[uuid] = volume
+ }
+ cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
+
+ s.stdin.Reset()
+ err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+ Env: nil,
+ KeepBuffers: 1,
+ Cluster: cluster,
+ })
+ c.Assert(err, IsNil)
+
+ s.engine = "docker"
+ s.testRunTrivialContainer(c)
+
+ fs, err := s.logCollection.FileSystem(s.client, s.kc)
+ c.Assert(err, IsNil)
+ f, err := fs.Open("keepstore.txt")
+ if trial.logConfig == "none" {
+ c.Check(err, NotNil)
+ c.Check(os.IsNotExist(err), Equals, true)
+ } else {
+ c.Assert(err, IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Assert(err, IsNil)
+ c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+ c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+ }
+ }
+}
+
func (s *integrationSuite) testRunTrivialContainer(c *C) {
if err := exec.Command("which", s.engine).Run(); err != nil {
c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
}
s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
s.setup(c)
- code := command{}.RunCommand("crunch-run", []string{
+
+ args := []string{
"-runtime-engine=" + s.engine,
"-enable-memory-limit=false",
s.cr.ContainerUUID,
- }, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+ }
+ if s.stdin.Len() > 0 {
+ args = append([]string{"-stdin-config=true"}, args...)
+ }
+ code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+ c.Logf("\n===== stdout =====\n%s", s.stdout.String())
+ c.Logf("\n===== stderr =====\n%s", s.stderr.String())
c.Check(code, Equals, 0)
err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
c.Assert(err, IsNil)
c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
}
}
+ s.logCollection = log
var output arvados.Collection
err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
c.Check(fi.Name(), Equals, ".keep")
}
}
+ s.outputCollection = output
}
import (
"bufio"
"bytes"
+ "encoding/json"
"fmt"
"io"
"log"
loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
}
+
+type filterKeepstoreErrorsOnly struct {
+ io.WriteCloser
+ buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+ log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+ f.buf = append(f.buf, p...)
+ start := 0
+ for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+ if f.buf[i] == '\n' {
+ if f.check(f.buf[start:i]) {
+ _, err := f.WriteCloser.Write(f.buf[start : i+1])
+ if err != nil {
+ return 0, err
+ }
+ }
+ start = i + 1
+ }
+ }
+ if start > 0 {
+ copy(f.buf, f.buf[start:])
+ f.buf = f.buf[:len(f.buf)-start]
+ }
+ return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+ if len(line) == 0 {
+ return false
+ }
+ if line[0] != '{' {
+ return true
+ }
+ var m map[string]interface{}
+ err := json.Unmarshal(line, &m)
+ if err != nil {
+ return true
+ }
+ if m["msg"] == "request" {
+ return false
+ }
+ if m["msg"] == "response" {
+ if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+ return false
+ }
+ }
+ return true
+}
package crunchrun
import (
+ "bytes"
"fmt"
+ "io"
"strings"
"testing"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
type LoggingTestSuite struct {
c.Check(true, Equals, strings.Contains(stderrLog, expected))
c.Check(string(kc.Content), Equals, logtext)
}
+
+type filterSuite struct{}
+
+var _ = Suite(&filterSuite{})
+
+func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) {
+ var buf bytes.Buffer
+ f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}}
+ for _, s := range []string{
+ "not j",
+ "son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n",
+ "\n[\n",
+ `{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n",
+ `{"msg":"response","respStatusCode":206}` + "\n",
+ } {
+ f.Write([]byte(s))
+ }
+ c.Check(buf.String(), check.Equals, `not json
+{"msg":"foo"}
+{}
+{"msg":1234}
+[
+{"msg":"response","respStatusCode":404,"foo": "bar"}
+`)
+}
+
+type nopCloser struct {
+ io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
needRAM += int64(cc.Containers.ReserveExtraRAM)
+ needRAM += int64(cc.Containers.LocalKeepBlobBuffersPerVCPU * needVCPUs * (1 << 26))
needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
ok := false
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
ArvMountDeadlockRate float64
ExecuteContainer func(arvados.Container) int
CrashRunningContainer func(arvados.Container)
- ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-env "
+ ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
sis *StubInstanceSet
id cloud.InstanceID
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) {
- var stdinKV map[string]string
- err := json.Unmarshal(stdinData, &stdinKV)
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+ var configData crunchrun.ConfigData
+ err := json.Unmarshal(stdinData, &configData)
if err != nil {
fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
return 1
}
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if stdinKV[name] == "" {
+ if configData.Env[name] == "" {
fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
return 1
}
instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
+ cluster: cluster,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
+ cluster *arvados.Cluster
bootProbeCommand string
runnerSource string
imageID cloud.ImageID
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
-type PoolSuite struct{}
+type PoolSuite struct {
+ logger logrus.FieldLogger
+ testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+ suite.logger = ctxlog.TestLogger(c)
+ cfg, err := config.NewLoader(nil, suite.logger).Load()
+ c.Assert(err, check.IsNil)
+ suite.testCluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+}
func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
type1 := test.InstanceType(1)
}
}
- logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
instanceSetID := cloud.InstanceSetID("test-instance-set-id")
- is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+ is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
}
}
- cluster := &arvados.Cluster{
- Containers: arvados.ContainersConfig{
- CloudVMs: arvados.CloudVMsConfig{
- BootProbeCommand: "true",
- MaxProbesPerSecond: 1000,
- ProbeInterval: arvados.Duration(time.Millisecond * 10),
- SyncInterval: arvados.Duration(time.Millisecond * 10),
- TagKeyPrefix: "testprefix:",
- },
- CrunchRunCommand: "crunch-run-custom",
- },
- InstanceTypes: arvados.InstanceTypeMap{
- type1.Name: type1,
- type2.Name: type2,
- type3.Name: type3,
- },
+ suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+ BootProbeCommand: "true",
+ MaxProbesPerSecond: 1000,
+ ProbeInterval: arvados.Duration(time.Millisecond * 10),
+ SyncInterval: arvados.Duration(time.Millisecond * 10),
+ TagKeyPrefix: "testprefix:",
+ }
+ suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+ suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+ type1.Name: type1,
+ type2.Name: type2,
+ type3.Name: type3,
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
}
}
// Wait for the tags to save to the cloud provider
- tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+ tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
deadline := time.Now().Add(time.Second)
for !func() bool {
pool.mtx.RLock()
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
}
func (suite *PoolSuite) TestDrain(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
ac := arvados.NewClientFromEnv()
type1 := test.InstanceType(1)
pool := &Pool{
arvClient: ac,
- logger: logger,
+ logger: suite.logger,
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
}
func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
type1 := test.InstanceType(1)
pool := &Pool{
- logger: logger,
+ logger: suite.logger,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ cluster: suite.testCluster,
maxConcurrentInstanceCreateOps: 1,
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
- logger: logger,
+ logger: suite.logger,
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"github.com/sirupsen/logrus"
)
type remoteRunner struct {
uuid string
executor Executor
- envJSON json.RawMessage
+ configJSON json.RawMessage
runnerCmd string
runnerArgs []string
remoteUser string
if err := enc.Encode(wkr.instType); err != nil {
panic(err)
}
- env := map[string]string{
+ var configData crunchrun.ConfigData
+ configData.Env = map[string]string{
"ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
"InstanceType": instJSON.String(),
"GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
}
if wkr.wp.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
+ configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
}
- envJSON, err := json.Marshal(env)
+ if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
+ configData.Cluster = wkr.wp.cluster
+ configData.KeepBuffers = bufs * wkr.instType.VCPUs
+ }
+ configJSON, err := json.Marshal(configData)
if err != nil {
panic(err)
}
rr := &remoteRunner{
uuid: uuid,
executor: wkr.executor,
- envJSON: envJSON,
+ configJSON: configJSON,
runnerCmd: wkr.wp.runnerCmd,
runnerArgs: wkr.wp.runnerArgs,
remoteUser: wkr.instance.RemoteUser(),
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- cmd := rr.runnerCmd + " --detach --stdin-env"
+ cmd := rr.runnerCmd + " --detach --stdin-config"
for _, arg := range rr.runnerArgs {
cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
}
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
- stdin := bytes.NewBuffer(rr.envJSON)
+ stdin := bytes.NewBuffer(rr.configJSON)
stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
if err != nil {
rr.logger.WithField("stdout", string(stdout)).
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&WorkerSuite{})
-type WorkerSuite struct{}
+type WorkerSuite struct {
+ logger logrus.FieldLogger
+ testCluster *arvados.Cluster
+}
+
+func (suite *WorkerSuite) SetUpTest(c *check.C) {
+ suite.logger = ctxlog.TestLogger(c)
+ cfg, err := config.NewLoader(nil, suite.logger).Load()
+ c.Assert(err, check.IsNil)
+ suite.testCluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+}
func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
- logger := ctxlog.TestLogger(c)
bootTimeout := time.Minute
probeTimeout := time.Second
ac := arvados.NewClientFromEnv()
- is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
+ is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
wp := &Pool{
arvClient: ac,
newExecutor: func(cloud.Instance) Executor { return exr },
+ cluster: suite.testCluster,
bootProbeCommand: "bootprobe",
timeoutBooting: bootTimeout,
timeoutProbe: probeTimeout,
exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
}
wkr := &worker{
- logger: logger,
+ logger: suite.logger,
executor: exr,
wp: wp,
mtx: &wp.mtx,
SupportedDockerImageFormats StringSet
UsePreemptibleInstances bool
RuntimeEngine string
+ LocalKeepBlobBuffersPerVCPU int
+ LocalKeepLogsToContainerLog string
JobsAPI struct {
Enable string
Link.where(link_class: 'signature',
tail_uuid: self.uuid).destroy_all
+ # delete tokens for this user
+ ApiClientAuthorization.where(user_id: self.id).destroy_all
+ # delete ssh keys for this user
+ AuthorizedKey.where(owner_uuid: self.uuid).destroy_all
+ AuthorizedKey.where(authorized_user_uuid: self.uuid).destroy_all
+
# delete user preferences (including profile)
self.prefs = {}
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class DeleteDisabledUserTokensAndKeys < ActiveRecord::Migration[5.2]
+ def up
+ execute "delete from api_client_authorizations where user_id in (select id from users where is_active ='false' and uuid not like '%-tpzed-anonymouspublic' and uuid not like '%-tpzed-000000000000000')"
+ execute "delete from authorized_keys where owner_uuid in (select uuid from users where is_active ='false' and uuid not like '%-tpzed-anonymouspublic' and uuid not like '%-tpzed-000000000000000')"
+ execute "delete from authorized_keys where authorized_user_uuid in (select uuid from users where is_active ='false' and uuid not like '%-tpzed-anonymouspublic' and uuid not like '%-tpzed-000000000000000')"
+ end
+
+ def down
+ # This migration is not reversible.
+ end
+end
('20210108033940'),
('20210126183521'),
('20210621204455'),
-('20210816191509');
+('20210816191509'),
+('20211027154300');
verify_link_existence created['uuid'], created['email'], true, true, true, true, false
+ # create a token
+ token = act_as_system_user do
+ ApiClientAuthorization.create!(user: User.find_by_uuid(created['uuid']), api_client: ApiClient.all.first).api_token
+ end
+
+ assert_equal 1, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'expected token not found'
+
post "/arvados/v1/users/#{created['uuid']}/unsetup", params: {}, headers: auth(:admin)
assert_response :success
created2 = json_response
assert_not_nil created2['uuid'], 'expected uuid for the newly created user'
assert_equal created['uuid'], created2['uuid'], 'expected uuid not found'
+ assert_equal 0, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'token should have been deleted by user unsetup'
verify_link_existence created['uuid'], created['email'], false, false, false, false, false
end
c.Check(err, check.IsNil)
c.Check(logentries.Items, check.HasLen, 1)
lastLogId := logentries.Items[0].ID
- nextLogId := lastLogId
var logbuf bytes.Buffer
logger := logrus.New()
c.Check(logbuf.String(), check.Matches, `(?ms).*msg="File `+direction+`".*`)
c.Check(logbuf.String(), check.Not(check.Matches), `(?ms).*level=error.*`)
- count := 0
- for ; nextLogId == lastLogId && count < 20; count++ {
- time.Sleep(50 * time.Millisecond)
+ deadline := time.Now().Add(time.Second)
+ for {
+ c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
arvados.ResourceListParams{
- Filters: []arvados.Filter{arvados.Filter{Attr: "event_type", Operator: "=", Operand: "file_" + direction}},
- Limit: &limit1,
- Order: "created_at desc",
+ Filters: []arvados.Filter{
+ {Attr: "event_type", Operator: "=", Operand: "file_" + direction},
+ {Attr: "object_uuid", Operator: "=", Operand: userUuid},
+ },
+ Limit: &limit1,
+ Order: "created_at desc",
})
- c.Check(err, check.IsNil)
- if len(logentries.Items) > 0 {
- nextLogId = logentries.Items[0].ID
+ c.Assert(err, check.IsNil)
+ if len(logentries.Items) > 0 &&
+ logentries.Items[0].ID > lastLogId &&
+ logentries.Items[0].ObjectUUID == userUuid &&
+ logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+ logentries.Items[0].Properties["collection_file_path"] == filepath {
+ break
}
+ c.Logf("logentries.Items: %+v", logentries.Items)
+ time.Sleep(50 * time.Millisecond)
}
- c.Check(count, check.Not(check.Equals), 20)
- c.Check(logentries.Items[0].ObjectUUID, check.Equals, userUuid)
- c.Check(logentries.Items[0].Properties["collection_uuid"], check.Equals, collectionUuid)
- c.Check(logentries.Items[0].Properties["collection_file_path"], check.Equals, filepath)
} else {
c.Check(resp.Result().StatusCode, check.Equals, http.StatusForbidden)
c.Check(logbuf.String(), check.Equals, "")
case strings.Contains(err.Error(), "Not Found"):
// "storage: service returned without a response body (404 Not Found)"
return os.ErrNotExist
+ case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
+ // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
+ return os.ErrNotExist
default:
return err
}
}
}
-type notifyingResponseRecorder struct {
- *httptest.ResponseRecorder
- closer chan bool
-}
-
-func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
- return r.closer
-}
-
func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
s.cluster.Collections.BlobSigning = false
c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize)
defer bufs.Put(bufs.Get(BlockSize))
- if err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
- c.Error(err)
- }
-
- resp := ¬ifyingResponseRecorder{
- ResponseRecorder: httptest.NewRecorder(),
- closer: make(chan bool, 1),
- }
- if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
- c.Fatal("notifyingResponseRecorder is broken")
- }
- // If anyone asks, the client has disconnected.
- resp.closer <- true
+ err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock)
+ c.Assert(err, check.IsNil)
+ resp := httptest.NewRecorder()
ok := make(chan struct{})
go func() {
- req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+ ctx, cancel := context.WithCancel(context.Background())
+ req, _ := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+ cancel()
s.handler.ServeHTTP(resp, req)
ok <- struct{}{}
}()
case <-ok:
}
- ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+ ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp)
for i, v := range s.handler.volmgr.AllWritable() {
if calls := v.Volume.(*MockVolume).called["GET"]; calls != 0 {
c.Errorf("volume %d got %d calls, expected 0", i, calls)
}
func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
locator := req.URL.Path[1:]
if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
- rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+ rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
return
}
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+ buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
resp.Write(buf[:size])
}
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(parent)
- if cn, ok := resp.(http.CloseNotifier); ok {
- go func(c <-chan bool) {
- select {
- case <-c:
- cancel()
- case <-ctx.Done():
- }
- }(cn.CloseNotify())
- }
- return ctx, cancel
-}
-
// Get a buffer from the pool -- but give up and return a non-nil
// error if ctx ends before we get a buffer.
func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
}
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
}
}
- buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+ buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
return
}
- result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+ result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {