package dispatchcloud
import (
+ "context"
"crypto/md5"
"encoding/json"
"fmt"
"git.curoverse.com/arvados.git/lib/dispatchcloud/worker"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/auth"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
type dispatcher struct {
Cluster *arvados.Cluster
+ Context context.Context
InstanceSetID cloud.InstanceSetID
logger logrus.FieldLogger
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- disp.logger = logrus.StandardLogger()
+ disp.logger = ctxlog.FromContext(disp.Context)
- if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
+ if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
} else {
disp.sshKey = key
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, arvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, arvClient)
if disp.Cluster.ManagementToken == "" {
mux := httprouter.New()
mux.HandlerFunc("GET", "/arvados/v1/dispatch/containers", disp.apiContainers)
mux.HandlerFunc("GET", "/arvados/v1/dispatch/instances", disp.apiInstances)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/hold", disp.apiInstanceHold)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/drain", disp.apiInstanceDrain)
- mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/:instance_id/run", disp.apiInstanceRun)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/hold", disp.apiInstanceHold)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/drain", disp.apiInstanceDrain)
+ mux.HandlerFunc("POST", "/arvados/v1/dispatch/instances/run", disp.apiInstanceRun)
metricsH := promhttp.HandlerFor(disp.reg, promhttp.HandlerOpts{
ErrorLog: disp.logger,
})
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
- sched := scheduler.New(disp.logger, disp.queue, disp.pool, staleLockTimeout, pollInterval)
+ sched := scheduler.New(disp.Context, disp.queue, disp.pool, staleLockTimeout, pollInterval)
sched.Start()
defer sched.Stop()
}
func (disp *dispatcher) apiInstanceIdleBehavior(w http.ResponseWriter, r *http.Request, want worker.IdleBehavior) {
- params, _ := r.Context().Value(httprouter.ParamsKey).(httprouter.Params)
- id := cloud.InstanceID(params.ByName("instance_id"))
+ id := cloud.InstanceID(r.FormValue("instance_id"))
+ if id == "" {
+ httpserver.Error(w, "instance_id parameter not provided", http.StatusBadRequest)
+ return
+ }
err := disp.pool.SetIdleBehavior(id, want)
if err != nil {
httpserver.Error(w, err.Error(), http.StatusNotFound)