Chen Chen <aflyhorse@gmail.com>
Veritas Genetics, Inc. <*@veritasgenetics.com>
Curii Corporation, Inc. <*@curii.com>
+Dante Tsang <dante@dantetsang.com>
+Codex Genetics Ltd <info@codexgenetics.com>
\ No newline at end of file
multi_json (~> 1.0)
websocket-driver (>= 0.2.0)
public_suffix (4.0.3)
- rack (2.0.7)
+ rack (2.2.2)
rack-mini-profiler (1.0.2)
rack (>= 1.2.0)
rack-test (0.6.3)
uglifier (~> 2.0)
BUNDLED WITH
- 1.11
+ 1.16.6
+AutoReloadConfig: true
Clusters:
zzzzz:
ManagementToken: e687950a23c3a9bceec28c6223a06c79
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect
github.com/arvados/cgofuse v1.2.0-arvados1
github.com/aws/aws-sdk-go v1.25.30
+ github.com/bgentry/speakeasy v0.1.0 // indirect
github.com/coreos/go-oidc v2.1.0+incompatible
github.com/coreos/go-systemd v0.0.0-20180108085132-cc4f39464dc7
github.com/dgrijalva/jwt-go v3.1.0+incompatible // indirect
github.com/docker/go-connections v0.3.0 // indirect
github.com/docker/go-units v0.3.3-0.20171221200356-d59758554a3d // indirect
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 // indirect
+ github.com/fsnotify/fsnotify v1.4.9
github.com/ghodss/yaml v1.0.0
github.com/gliderlabs/ssh v0.2.2 // indirect
github.com/gogo/protobuf v1.1.1
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
+github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
+github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/cespare/xxhash/v2 v2.1.0 h1:yTUvW7Vhb89inJ+8irsUqiWjh8iT6sQPZiQzI6ReGkA=
github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tjxl5dIMyVM=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/docker/go-units v0.3.3-0.20171221200356-d59758554a3d/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BHsljHzVlRcyQhjrss6TZTdY2VfCqZPbv5k3iBFa2ZQ=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.2.2 h1:6zsha5zo/TWhRhwqCD3+EarCAgZ2yN28ipRnGPnwkI0=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd h1:3x5uuvBgE6oaXJjCOvpCC1IpgJogqQ+PqGGU3ZxAgII=
golang.org/x/sys v0.0.0-20191105231009-c1f44814a5cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
String() string
}
+var errNeedConfigReload = errors.New("config changed, restart needed")
+
type bootCommand struct{}
-func (bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- super := &Supervisor{
- Stderr: stderr,
- logger: ctxlog.New(stderr, "json", "info"),
+func (bcmd bootCommand) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ logger := ctxlog.New(stderr, "json", "info")
+ ctx := ctxlog.Context(context.Background(), logger)
+ for {
+ err := bcmd.run(ctx, prog, args, stdin, stdout, stderr)
+ if err == errNeedConfigReload {
+ continue
+ } else if err != nil {
+ logger.WithError(err).Info("exiting")
+ return 1
+ } else {
+ return 0
+ }
}
+}
- ctx := ctxlog.Context(context.Background(), super.logger)
+func (bcmd bootCommand) run(ctx context.Context, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
-
- var err error
- defer func() {
- if err != nil {
- super.logger.WithError(err).Info("exiting")
- }
- }()
+ super := &Supervisor{
+ Stderr: stderr,
+ logger: ctxlog.FromContext(ctx),
+ }
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
flags.SetOutput(stderr)
flags.BoolVar(&super.OwnTemporaryDatabase, "own-temporary-database", false, "bring up a postgres server and create a temporary database")
timeout := flags.Duration("timeout", 0, "maximum time to wait for cluster to be ready")
shutdown := flags.Bool("shutdown", false, "shut down when the cluster becomes ready")
- err = flags.Parse(args)
+ err := flags.Parse(args)
if err == flag.ErrHelp {
- err = nil
- return 0
+ return nil
} else if err != nil {
- return 2
+ return err
} else if *versionFlag {
- return cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
+ cmd.Version.RunCommand(prog, args, stdin, stdout, stderr)
+ return nil
} else if super.ClusterType != "development" && super.ClusterType != "test" && super.ClusterType != "production" {
- err = fmt.Errorf("cluster type must be 'development', 'test', or 'production'")
- return 2
+ return fmt.Errorf("cluster type must be 'development', 'test', or 'production'")
}
loader.SkipAPICalls = true
cfg, err := loader.Load()
if err != nil {
- return 1
+ return err
}
- super.Start(ctx, cfg)
+ super.Start(ctx, cfg, loader.Path)
defer super.Stop()
var timer *time.Timer
url, ok := super.WaitReady()
if timer != nil && !timer.Stop() {
- err = errors.New("boot timed out")
- return 1
+ return errors.New("boot timed out")
} else if !ok {
- err = errors.New("boot failed")
- return 1
- }
- // Write controller URL to stdout. Nothing else goes to
- // stdout, so this provides an easy way for a calling script
- // to discover the controller URL when everything is ready.
- fmt.Fprintln(stdout, url)
- if *shutdown {
- super.Stop()
+ super.logger.Error("boot failed")
+ } else {
+ // Write controller URL to stdout. Nothing else goes
+ // to stdout, so this provides an easy way for a
+ // calling script to discover the controller URL when
+ // everything is ready.
+ fmt.Fprintln(stdout, url)
+ if *shutdown {
+ super.Stop()
+ }
}
// Wait for signal/crash + orderly shutdown
- <-super.done
- return 0
+ return super.Wait()
}
"os/signal"
"os/user"
"path/filepath"
+ "reflect"
"strings"
"sync"
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/health"
+ "github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)
ctx context.Context
cancel context.CancelFunc
- done chan struct{}
+ done chan struct{} // closed when child procs/services have shut down
+ err error // error that caused shutdown (valid when done is closed)
healthChecker *health.Aggregator
tasksReady map[string]chan bool
waitShutdown sync.WaitGroup
environ []string // for child processes
}
-func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config) {
+func (super *Supervisor) Start(ctx context.Context, cfg *arvados.Config, cfgPath string) {
super.ctx, super.cancel = context.WithCancel(ctx)
super.done = make(chan struct{})
go func() {
+ defer close(super.done)
+
sigch := make(chan os.Signal)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
defer signal.Stop(sigch)
go func() {
for sig := range sigch {
super.logger.WithField("signal", sig).Info("caught signal")
+ if super.err == nil {
+ super.err = fmt.Errorf("caught signal %s", sig)
+ }
+ super.cancel()
+ }
+ }()
+
+ hupch := make(chan os.Signal)
+ signal.Notify(hupch, syscall.SIGHUP)
+ defer signal.Stop(hupch)
+ go func() {
+ for sig := range hupch {
+ super.logger.WithField("signal", sig).Info("caught signal")
+ if super.err == nil {
+ super.err = errNeedConfigReload
+ }
super.cancel()
}
}()
+ if cfgPath != "" && cfgPath != "-" && cfg.AutoReloadConfig {
+ go watchConfig(super.ctx, super.logger, cfgPath, copyConfig(cfg), func() {
+ if super.err == nil {
+ super.err = errNeedConfigReload
+ }
+ super.cancel()
+ })
+ }
+
err := super.run(cfg)
if err != nil {
super.logger.WithError(err).Warn("supervisor shut down")
+ if super.err == nil {
+ super.err = err
+ }
}
- close(super.done)
}()
}
+func (super *Supervisor) Wait() error {
+ <-super.done
+ return super.err
+}
+
func (super *Supervisor) run(cfg *arvados.Config) error {
+ defer super.cancel()
+
cwd, err := os.Getwd()
if err != nil {
return err
}
return ctx.Err()
}
+
+func copyConfig(cfg *arvados.Config) *arvados.Config {
+ pr, pw := io.Pipe()
+ go func() {
+ err := json.NewEncoder(pw).Encode(cfg)
+ if err != nil {
+ panic(err)
+ }
+ pw.Close()
+ }()
+ cfg2 := new(arvados.Config)
+ err := json.NewDecoder(pr).Decode(cfg2)
+ if err != nil {
+ panic(err)
+ }
+ return cfg2
+}
+
+func watchConfig(ctx context.Context, logger logrus.FieldLogger, cfgPath string, prevcfg *arvados.Config, fn func()) {
+ watcher, err := fsnotify.NewWatcher()
+ if err != nil {
+ logger.WithError(err).Error("fsnotify setup failed")
+ return
+ }
+ defer watcher.Close()
+
+ err = watcher.Add(cfgPath)
+ if err != nil {
+ logger.WithError(err).Error("fsnotify watcher failed")
+ return
+ }
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case err, ok := <-watcher.Errors:
+ if !ok {
+ return
+ }
+ logger.WithError(err).Warn("fsnotify watcher reported error")
+ case _, ok := <-watcher.Events:
+ if !ok {
+ return
+ }
+ for len(watcher.Events) > 0 {
+ <-watcher.Events
+ }
+ loader := config.NewLoader(&bytes.Buffer{}, &logrus.Logger{Out: ioutil.Discard})
+ loader.Path = cfgPath
+ loader.SkipAPICalls = true
+ cfg, err := loader.Load()
+ if err != nil {
+ logger.WithError(err).Warn("error reloading config file after change detected; ignoring new config for now")
+ } else if reflect.DeepEqual(cfg, prevcfg) {
+ logger.Debug("config file changed but is still DeepEqual to the existing config")
+ } else {
+ logger.Debug("config changed, notifying supervisor")
+ fn()
+ prevcfg = cfg
+ }
+ }
+ }
+}
code := DumpCommand.RunCommand("arvados config-dump", []string{"-config", "-"}, bytes.NewBufferString(in), &stdout, &stderr)
c.Check(code, check.Equals, 0)
c.Check(stderr.String(), check.Matches, `(?ms).*deprecated or unknown config entry: Clusters.z1234.UnknownKey.*`)
- c.Check(stdout.String(), check.Matches, `(?ms)Clusters:\n z1234:\n.*`)
+ c.Check(stdout.String(), check.Matches, `(?ms)(.*\n)?Clusters:\n z1234:\n.*`)
c.Check(stdout.String(), check.Matches, `(?ms).*\n *ManagementToken: secret\n.*`)
c.Check(stdout.String(), check.Not(check.Matches), `(?ms).*UnknownKey.*`)
}
# implementation. Note that it also disables some new federation
# features and will be removed in a future release.
ForceLegacyAPI14: false
+
+# (Experimental) Restart services automatically when config file
+# changes are detected. Only supported by `arvados-server boot` in
+# dev/test mode.
+AutoReloadConfig: false
# implementation. Note that it also disables some new federation
# features and will be removed in a future release.
ForceLegacyAPI14: false
+
+# (Experimental) Restart services automatically when config file
+# changes are detected. Only supported by ` + "`" + `arvados-server boot` + "`" + ` in
+# dev/test mode.
+AutoReloadConfig: false
`)
},
config: *cfg,
}
- s.testClusters[id].super.Start(context.Background(), &s.testClusters[id].config)
+ s.testClusters[id].super.Start(context.Background(), &s.testClusters[id].config, "-")
}
for _, tc := range s.testClusters {
au, ok := tc.super.WaitReady()
*next[upd.UUID] = upd
}
}
- selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts"}
+ selectParam := []string{"uuid", "state", "priority", "runtime_constraints", "container_image", "mounts", "scheduling_parameters"}
limitParam := 1000
mine, err := cq.fetchAll(arvados.ResourceListParams{
}()
type Config struct {
- Clusters map[string]Cluster
+ Clusters map[string]Cluster
+ AutoReloadConfig bool
}
// GetConfig returns the current system config, loading it from
MaxPermissionEntries int
MaxUUIDEntries int
}
+
type Cluster struct {
ClusterID string `json:"-"`
ManagementToken string
pg (1.1.4)
power_assert (1.1.4)
public_suffix (4.0.3)
- rack (2.0.7)
+ rack (2.2.2)
rack-test (0.6.3)
rack (>= 1.0)
rails (5.0.7.2)
uglifier (~> 2.0)
BUNDLED WITH
- 1.11
+ 1.16.6
return fmt.Errorf("Error setting up arvados client %v", err)
}
+ // If a config file is available, use the keepstores defined there
+ // instead of the legacy autodiscover mechanism via the API server
+ for k := range cluster.Services.Keepstore.InternalURLs {
+ arv.KeepServiceURIs = append(arv.KeepServiceURIs, k.String())
+ }
+
if cluster.SystemLogs.LogLevel == "debug" {
keepclient.DebugPrintf = log.Printf
}
// Tests that require the Keep server running
type ServerRequiredSuite struct{}
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredConfigYmlSuite{})
+
+// Tests that require the Keep servers running as defined in config.yml
+type ServerRequiredConfigYmlSuite struct{}
+
// Gocheck boilerplate
var _ = Suite(&NoKeepServerSuite{})
arvadostest.StopAPI()
}
+func (s *ServerRequiredConfigYmlSuite) SetUpSuite(c *C) {
+ arvadostest.StartAPI()
+ // config.yml defines 4 keepstores
+ arvadostest.StartKeep(4, false)
+}
+
+func (s *ServerRequiredConfigYmlSuite) SetUpTest(c *C) {
+ arvadostest.ResetEnv()
+}
+
+func (s *ServerRequiredConfigYmlSuite) TearDownSuite(c *C) {
+ arvadostest.StopKeep(4)
+ arvadostest.StopAPI()
+}
+
func (s *NoKeepServerSuite) SetUpSuite(c *C) {
arvadostest.StartAPI()
// We need API to have some keep services listed, but the
arvadostest.StopAPI()
}
-func runProxy(c *C, bogusClientToken bool) *keepclient.KeepClient {
+func runProxy(c *C, bogusClientToken bool, loadKeepstoresFromConfig bool) *keepclient.KeepClient {
cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
c.Assert(err, Equals, nil)
cluster, err := cfg.GetCluster("")
c.Assert(err, Equals, nil)
+ if !loadKeepstoresFromConfig {
+ // Do not load Keepstore InternalURLs from the config file
+ cluster.Services.Keepstore.InternalURLs = make(map[arvados.URL]arvados.ServiceInstance)
+ }
+
cluster.Services.Keepproxy.InternalURLs = map[arvados.URL]arvados.ServiceInstance{arvados.URL{Host: ":0"}: arvados.ServiceInstance{}}
listener = nil
}
func (s *ServerRequiredSuite) TestResponseViaHeader(c *C) {
- runProxy(c, false)
+ runProxy(c, false, false)
defer closeListener()
req, err := http.NewRequest("POST",
}
func (s *ServerRequiredSuite) TestLoopDetection(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
sr := map[string]string{
}
func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
// Set up fake keepstore to record request headers
}
func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
content := []byte("TestDesiredReplicas")
}
func (s *ServerRequiredSuite) TestPutWrongContentLength(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
content := []byte("TestPutWrongContentLength")
}
func (s *ServerRequiredSuite) TestManyFailedPuts(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
router.(*proxyHandler).timeout = time.Nanosecond
}
func (s *ServerRequiredSuite) TestPutAskGet(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
}
func (s *ServerRequiredSuite) TestPutAskGetForbidden(c *C) {
- kc := runProxy(c, true)
+ kc := runProxy(c, true, false)
defer closeListener()
hash := fmt.Sprintf("%x+3", md5.Sum([]byte("bar")))
}
func (s *ServerRequiredSuite) TestCorsHeaders(c *C) {
- runProxy(c, false)
+ runProxy(c, false, false)
defer closeListener()
{
}
func (s *ServerRequiredSuite) TestPostWithoutHash(c *C) {
- runProxy(c, false)
+ runProxy(c, false, false)
defer closeListener()
{
// With a valid but non-existing prefix (expect "\n")
// With an invalid prefix (expect error)
func (s *ServerRequiredSuite) TestGetIndex(c *C) {
- kc := runProxy(c, false)
+ getIndexWorker(c, false)
+}
+
+// Test GetIndex
+// Uses config.yml
+// Put one block, with 2 replicas
+// With no prefix (expect the block locator, twice)
+// With an existing prefix (expect the block locator, twice)
+// With a valid but non-existing prefix (expect "\n")
+// With an invalid prefix (expect error)
+func (s *ServerRequiredConfigYmlSuite) TestGetIndex(c *C) {
+ getIndexWorker(c, true)
+}
+
+func getIndexWorker(c *C, useConfig bool) {
+ kc := runProxy(c, false, useConfig)
defer closeListener()
// Put "index-data" blocks
}
func (s *ServerRequiredSuite) TestCollectionSharingToken(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
hash, _, err := kc.PutB([]byte("shareddata"))
c.Check(err, IsNil)
}
func (s *ServerRequiredSuite) TestPutAskGetInvalidToken(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
// Put a test block
}
func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
// Point keepproxy at a non-existent keepstore
}
func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
}
func (s *ServerRequiredSuite) TestPing(c *C) {
- kc := runProxy(c, false)
+ kc := runProxy(c, false, false)
defer closeListener()
rtr := MakeRESTRouter(kc, 10*time.Second, arvadostest.ManagementToken)