scheduler.WorkerPool
Instances() []worker.InstanceView
SetIdleBehavior(cloud.InstanceID, worker.IdleBehavior) error
+ KillInstance(id cloud.InstanceID, reason string) error
Stop()
}
type dispatcher struct {
Cluster *arvados.Cluster
Context context.Context
+ ArvClient *arvados.Client
+ AuthToken string
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
}
func (disp *dispatcher) initialize() {
- arvClient := arvados.NewClientFromEnv()
+ disp.logger = ctxlog.FromContext(disp.Context)
+
+ disp.ArvClient.AuthToken = disp.AuthToken
+
if disp.InstanceSetID == "" {
- if strings.HasPrefix(arvClient.AuthToken, "v2/") {
- disp.InstanceSetID = cloud.InstanceSetID(strings.Split(arvClient.AuthToken, "/")[1])
+ if strings.HasPrefix(disp.AuthToken, "v2/") {
+ disp.InstanceSetID = cloud.InstanceSetID(strings.Split(disp.AuthToken, "/")[1])
} else {
// Use some other string unique to this token
// that doesn't reveal the token itself.
- disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(arvClient.AuthToken))))
+ disp.InstanceSetID = cloud.InstanceSetID(fmt.Sprintf("%x", md5.Sum([]byte(disp.AuthToken))))
}
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = ctxlog.FromContext(disp.Context)
if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
- disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
disp.httpHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/kill", disp.apiInstanceKill)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
disp.apiInstanceIdleBehavior(w, r, worker.IdleBehaviorRun)
}
+// Management API: shutdown/destroy specified instance now.
+func (disp *dispatcher) apiInstanceKill(w http.ResponseWriter, r *http.Request) {
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
+ err := disp.pool.KillInstance(id, "via management API: "+r.FormValue("reason"))
+ if err != nil {
+ httpserver.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+}
+
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
id := cloud.InstanceID(r.FormValue("instance_id"))
if id == "" {