Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / dispatchcloud / dispatcher.go
index ae91a710e395295f47a34cb5645f980021e79021..3403c50c972987e7f6f21a927a6db592fac9f6fc 100644 (file)
@@ -15,6 +15,8 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/controller/dblock"
+       "git.arvados.org/arvados.git/lib/ctrlctx"
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
        "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
        "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
        "git.arvados.org/arvados.git/lib/dispatchcloud/container"
        "git.arvados.org/arvados.git/lib/dispatchcloud/scheduler"
        "git.arvados.org/arvados.git/lib/dispatchcloud/sshexecutor"
@@ -53,6 +55,7 @@ type dispatcher struct {
        Registry      *prometheus.Registry
        InstanceSetID cloud.InstanceSetID
 
        Registry      *prometheus.Registry
        InstanceSetID cloud.InstanceSetID
 
+       dbConnector ctrlctx.DBConnector
        logger      logrus.FieldLogger
        instanceSet cloud.InstanceSet
        pool        pool
        logger      logrus.FieldLogger
        instanceSet cloud.InstanceSet
        pool        pool
@@ -118,6 +121,7 @@ func (disp *dispatcher) setup() {
 
 func (disp *dispatcher) initialize() {
        disp.logger = ctxlog.FromContext(disp.Context)
 
 func (disp *dispatcher) initialize() {
        disp.logger = ctxlog.FromContext(disp.Context)
+       disp.dbConnector = ctrlctx.DBConnector{PostgreSQL: disp.Cluster.PostgreSQL}
 
        disp.ArvClient.AuthToken = disp.AuthToken
 
 
        disp.ArvClient.AuthToken = disp.AuthToken
 
@@ -143,6 +147,7 @@ func (disp *dispatcher) initialize() {
        if err != nil {
                disp.logger.Fatalf("error initializing driver: %s", err)
        }
        if err != nil {
                disp.logger.Fatalf("error initializing driver: %s", err)
        }
+       dblock.Dispatch.Lock(disp.Context, disp.dbConnector.GetDB)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
        disp.instanceSet = instanceSet
        disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.Registry, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
        disp.queue = container.NewQueue(disp.logger, disp.Registry, disp.typeChooser, disp.ArvClient)
@@ -175,6 +180,7 @@ func (disp *dispatcher) initialize() {
 }
 
 func (disp *dispatcher) run() {
 }
 
 func (disp *dispatcher) run() {
+       defer dblock.Dispatch.Unlock()
        defer close(disp.stopped)
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()
        defer close(disp.stopped)
        defer disp.instanceSet.Stop()
        defer disp.pool.Stop()