"sync"
"time"
- "git.curoverse.com/arvados.git/lib/cloud"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/container"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/scheduler"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/ssh_executor"
- "git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
- "git.curoverse.com/arvados.git/sdk/go/auth"
- "git.curoverse.com/arvados.git/sdk/go/ctxlog"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/container"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
+ "git.arvados.org/arvados.git/lib/dispatchcloud/worker"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/auth"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "git.arvados.org/arvados.git/sdk/go/health"
+ "git.arvados.org/arvados.git/sdk/go/httpserver"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
type pool interface {
scheduler.WorkerPool
+ CheckHealth() error
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+ KillInstance(id cloud.InstanceID, reason string) error
Stop()
}
type dispatcher struct {
Cluster *arvados.Cluster
Context context.Context
+ ArvClient *arvados.Client
+ AuthToken string
+ Registry *prometheus.Registry
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
- reg *prometheus.Registry
instanceSet cloud.InstanceSet
pool pool
queue scheduler.ContainerQueue
// CheckHealth implements service.Handler.
func (disp *dispatcher) CheckHealth() error {
disp.Start()
- return nil
+ return disp.pool.CheckHealth()
+}
+
+// Done implements service.Handler.
+func (disp *dispatcher) Done() <-chan struct{} {
+ return disp.stopped
}
// Stop dispatching containers and release resources. Typically used
// Make a worker.Executor for the given instance.
func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
- exr := ssh_executor.New(inst)
- exr.SetTargetPort(disp.Cluster.CloudVMs.SSHPort)
+ exr := sshexecutor.New(inst)
+ exr.SetTargetPort(disp.Cluster.Containers.CloudVMs.SSHPort)
exr.SetSigners(disp.sshKey)
return exr
}
}
func (disp *dispatcher) initialize() {
- arvClient := arvados.NewClientFromEnv()
+ disp.logger = ctxlog.FromContext(disp.Context)
+
+ disp.ArvClient.AuthToken = disp.AuthToken
+
if disp.InstanceSetID == "" {
- if strings.HasPrefix(arvClient.AuthToken, "v2/") {
- disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+ if strings.HasPrefix(disp.AuthToken, "v2/") {
+ disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
} else {
// Use some other string unique to this token
// that doesn't reveal the token itself.
- disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+ disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
}
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = ctxlog.FromContext(disp.Context)
- if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
- disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
+ if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Containers.DispatchPrivateKey)); err != nil {
+ disp.logger.Fatalf("error parsing configured Containers.DispatchPrivateKey: %s", err)
} else {
disp.sshKey = key
}
- instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger)
+ instanceSet, err := newInstanceSet(disp.Cluster, disp.InstanceSetID, disp.logger, disp.Registry)
if err != nil {
disp.logger.Fatalf("error initializing driver: %s", err)
}
disp.instanceSet = instanceSet
- disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
- disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
} else {
mux := httprouter.New()
mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/containers/kill", disp.apiContainerKill)
mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
- metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
+ metricsH := promhttp.HandlerFor(disp.Registry, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
mux.Handler("GET", "/metrics", metricsH)
mux.Handler("GET", "/metrics.json", metricsH)
+ mux.Handler("GET", "/_health/:check", &health.Handler{
+ Token: disp.Cluster.ManagementToken,
+ Prefix: "/_health/",
+ Routes: health.Routes{"ping": disp.CheckHealth},
+ })
disp.httpHandler = auth.RequireLiteralToken(disp.Cluster.ManagementToken, mux)
}
}
defer disp.instanceSet.Stop()
defer disp.pool.Stop()
- staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
+ staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
if staleLockTimeout == 0 {
staleLockTimeout = defaultStaleLockTimeout
}
- pollInterval := time.Duration(disp.Cluster.Dispatch.PollInterval)
+ pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.queue, disp.pool, disp.Registry, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
}
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
+ err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
+
+// Management API: send SIGTERM to specified container's crunch-run
+// process now.
+func (disp *dispatcher) apiContainerKill(w http.ResponseWriter, r *http.Request) {
+ uuid := r.FormValue("container_uuid")
+ if uuid == "" {
+ httpserver.Error(w, "container_uuid parameter not provided", http.StatusBadRequest)
+ return
+ }
+ if !disp.pool.KillContainer(uuid, "via management API: "+r.FormValue("reason")) {
+ httpserver.Error(w, "container not found", http.StatusNotFound)
+ return
+ }
+}
+
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
id := cloud.InstanceID(r.FormValue("instance_id"))
if id == "" {