12199: Merge branch 'master' into 12199-dispatch-to-node-type
authorTom Clegg <tclegg@veritasgenetics.com>
Mon, 12 Feb 2018 22:28:22 +0000 (17:28 -0500)
committerTom Clegg <tclegg@veritasgenetics.com>
Mon, 12 Feb 2018 22:28:22 +0000 (17:28 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

27 files changed:
build/run-tests.sh
sdk/go/arvados/config.go
sdk/go/arvados/container.go
services/api/app/controllers/application_controller.rb
services/api/app/controllers/arvados/v1/nodes_controller.rb
services/api/app/models/node.rb
services/api/test/functional/arvados/v1/nodes_controller_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/squeue.go
services/dispatchcloud/gocheck_test.go [new file with mode: 0644]
services/dispatchcloud/node_size.go [new file with mode: 0644]
services/dispatchcloud/node_size_test.go [new file with mode: 0644]
services/nodemanager/arvnodeman/computenode/dispatch/__init__.py
services/nodemanager/arvnodeman/computenode/dispatch/slurm.py
services/nodemanager/arvnodeman/computenode/driver/gce.py
services/nodemanager/arvnodeman/daemon.py
services/nodemanager/arvnodeman/jobqueue.py
services/nodemanager/arvnodeman/nodelist.py
services/nodemanager/tests/integration_test.py
services/nodemanager/tests/test_computenode_dispatch.py
services/nodemanager/tests/test_computenode_dispatch_slurm.py
services/nodemanager/tests/test_computenode_driver_gce.py
services/nodemanager/tests/test_daemon.py
services/nodemanager/tests/test_jobqueue.py
services/nodemanager/tests/test_nodelist.py
services/nodemanager/tests/testutil.py

index ec3c4637703abdd4749d6efd00980f854c6ee02f..1ef15cd3dbe9974f2fe37619f06845b8964a4ecf 100755 (executable)
@@ -77,6 +77,7 @@ lib/crunchstat
 services/api
 services/arv-git-httpd
 services/crunchstat
+services/dispatchcloud
 services/dockercleaner
 services/fuse
 services/health
@@ -900,6 +901,7 @@ gostuff=(
     sdk/go/stats
     services/arv-git-httpd
     services/crunchstat
+    services/dispatchcloud
     services/health
     services/keep-web
     services/keepstore
index ca0df1fc907b86ed4bec09b3a05c1697a6295cd4..9ed0eacf23e6d753c1b6c2a0f781282c96dde8cc 100644 (file)
@@ -1,3 +1,7 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
 package arvados
 
 import (
@@ -48,6 +52,16 @@ type Cluster struct {
        ClusterID       string `json:"-"`
        ManagementToken string
        SystemNodes     map[string]SystemNode
+       InstanceTypes   []InstanceType
+}
+
+type InstanceType struct {
+       Name         string
+       ProviderType string
+       VCPUs        int
+       RAM          int64
+       Scratch      int64
+       Price        float64
 }
 
 // GetThisSystemNode returns a SystemNode for the node we're running
index a541a8dca77fb03b9d6728fd8c9c13c5836414c8..20d007c5c818daf3190b9852e840fd8a2f97d47e 100644 (file)
@@ -41,9 +41,9 @@ type Mount struct {
 // CPU) and network connectivity.
 type RuntimeConstraints struct {
        API          *bool
-       RAM          int `json:"ram"`
-       VCPUs        int `json:"vcpus"`
-       KeepCacheRAM int `json:"keep_cache_ram"`
+       RAM          int64 `json:"ram"`
+       VCPUs        int   `json:"vcpus"`
+       KeepCacheRAM int64 `json:"keep_cache_ram"`
 }
 
 // SchedulingParameters specify a container's scheduling parameters
index c94ce89395840452398e8d2b7944cbaf64df3b71..c4f64f6039b3683127d2b5735ae11064446d10cb 100644 (file)
@@ -554,6 +554,10 @@ class ApplicationController < ActionController::Base
     }
   end
 
+  def self._update_requires_parameters
+    {}
+  end
+
   def self._index_requires_parameters
     {
       filters: { type: 'array', required: false },
index 7ee8c2f149e590dea892ee6723fe42b5450b29c3..73f1dee54ad31a1ed908b4fd4bee2c3f3d3f7b8f 100644 (file)
@@ -20,6 +20,32 @@ class Arvados::V1::NodesController < ApplicationController
     { ping_secret: {required: true} }
   end
 
+  def self._create_requires_parameters
+    super.merge(
+      { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+  end
+
+  def self._update_requires_parameters
+    super.merge(
+      { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+  end
+
+  def create
+    @object = model_class.new(resource_attrs)
+    @object.assign_slot if params[:assign_slot]
+    @object.save!
+    show
+  end
+
+  def update
+    attrs_to_update = resource_attrs.reject { |k,v|
+      [:kind, :etag, :href].index k
+    }
+    @object.update_attributes!(attrs_to_update)
+    @object.assign_slot if params[:assign_slot]
+    show
+  end
+
   def ping
     act_as_system_user do
       @object = Node.where(uuid: (params[:id] || params[:uuid])).first
index bf1b636c52836bd54c75373c29fde51897e4b766..3d8b91b4b62df590edd2c1049a5ea69e17224bef 100644 (file)
@@ -106,27 +106,7 @@ class Node < ArvadosModel
       end
     end
 
-    # Assign slot_number
-    if self.slot_number.nil?
-      while true
-        n = self.class.available_slot_number
-        if n.nil?
-          raise "No available node slots"
-        end
-        self.slot_number = n
-        begin
-          self.save!
-          break
-        rescue ActiveRecord::RecordNotUnique
-          # try again
-        end
-      end
-    end
-
-    # Assign hostname
-    if self.hostname.nil? and Rails.configuration.assign_node_hostname
-      self.hostname = self.class.hostname_for_slot(self.slot_number)
-    end
+    assign_slot
 
     # Record other basic stats
     ['total_cpu_cores', 'total_ram_mb', 'total_scratch_mb'].each do |key|
@@ -140,8 +120,30 @@ class Node < ArvadosModel
     save!
   end
 
+  def assign_slot
+    return if self.slot_number.andand > 0
+    while true
+      self.slot_number = self.class.available_slot_number
+      if self.slot_number.nil?
+        raise "No available node slots"
+      end
+      begin
+        save!
+        return assign_hostname
+      rescue ActiveRecord::RecordNotUnique
+        # try again
+      end
+    end
+  end
+
   protected
 
+  def assign_hostname
+    if self.hostname.nil? and Rails.configuration.assign_node_hostname
+      self.hostname = self.class.hostname_for_slot(self.slot_number)
+    end
+  end
+
   def self.available_slot_number
     # Join the sequence 1..max with the nodes table. Return the first
     # (i.e., smallest) value that doesn't match the slot_number of any
index f9e5be454e4f8cd3de8b6d1be68134263e63245d..c198c4c8ee9874cc1769b11428c11f44290fcf85 100644 (file)
@@ -78,6 +78,40 @@ class Arvados::V1::NodesControllerTest < ActionController::TestCase
     assert_not_nil json_response['uuid']
     assert_not_nil json_response['info'].is_a? Hash
     assert_not_nil json_response['info']['ping_secret']
+    assert_nil json_response['slot_number']
+    assert_nil json_response['hostname']
+  end
+
+  test "create node and assign slot" do
+    authorize_with :admin
+    post :create, {node: {}, assign_slot: true}
+    assert_response :success
+    assert_not_nil json_response['uuid']
+    assert_not_nil json_response['info'].is_a? Hash
+    assert_not_nil json_response['info']['ping_secret']
+    assert_operator 0, :<, json_response['slot_number']
+    n = json_response['slot_number']
+    assert_equal "compute#{n}", json_response['hostname']
+  end
+
+  test "update node and assign slot" do
+    authorize_with :admin
+    node = nodes(:new_with_no_hostname)
+    post :update, {id: node.uuid, node: {}, assign_slot: true}
+    assert_response :success
+    assert_operator 0, :<, json_response['slot_number']
+    n = json_response['slot_number']
+    assert_equal "compute#{n}", json_response['hostname']
+  end
+
+  test "update node and assign slot, don't clobber hostname" do
+    authorize_with :admin
+    node = nodes(:new_with_custom_hostname)
+    post :update, {id: node.uuid, node: {}, assign_slot: true}
+    assert_response :success
+    assert_operator 0, :<, json_response['slot_number']
+    n = json_response['slot_number']
+    assert_equal "custom1", json_response['hostname']
   end
 
   test "ping adds node stats to info" do
index ae2ca58421d3f3a58f0a4a0f28cbdc2beb56e6af..9eebbab02bbc04bde9bd07e6be47333849171662 100644 (file)
@@ -21,13 +21,21 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       "git.curoverse.com/arvados.git/services/dispatchcloud"
        "github.com/coreos/go-systemd/daemon"
 )
 
-var version = "dev"
+var (
+       version           = "dev"
+       defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+)
+
+type Dispatcher struct {
+       *dispatch.Dispatcher
+       cluster *arvados.Cluster
+       sqCheck *SqueueChecker
+       slurm   Slurm
 
-// Config used by crunch-dispatch-slurm
-type Config struct {
        Client arvados.Client
 
        SbatchArguments []string
@@ -41,27 +49,27 @@ type Config struct {
 
        // Minimum time between two attempts to run the same container
        MinRetryPeriod arvados.Duration
-
-       slurm Slurm
 }
 
 func main() {
-       theConfig.slurm = &slurmCLI{}
-       err := doMain()
+       disp := &Dispatcher{}
+       err := disp.Run(os.Args[0], os.Args[1:])
        if err != nil {
                log.Fatal(err)
        }
 }
 
-var (
-       theConfig Config
-       sqCheck   = &SqueueChecker{}
-)
-
-const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+func (disp *Dispatcher) Run(prog string, args []string) error {
+       if err := disp.configure(prog, args); err != nil {
+               return err
+       }
+       disp.setup()
+       return disp.run()
+}
 
-func doMain() error {
-       flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+// configure() loads config files. Tests skip this.
+func (disp *Dispatcher) configure(prog string, args []string) error {
+       flags := flag.NewFlagSet(prog, flag.ExitOnError)
        flags.Usage = func() { usage(flags) }
 
        configPath := flags.String(
@@ -77,7 +85,7 @@ func doMain() error {
                false,
                "Print version information and exit.")
        // Parse args; omit the first arg which is the command name
-       flags.Parse(os.Args[1:])
+       flags.Parse(args)
 
        // Print version information if requested
        if *getVersion {
@@ -87,63 +95,84 @@ func doMain() error {
 
        log.Printf("crunch-dispatch-slurm %s started", version)
 
-       err := readConfig(&theConfig, *configPath)
+       err := disp.readConfig(*configPath)
        if err != nil {
                return err
        }
 
-       if theConfig.CrunchRunCommand == nil {
-               theConfig.CrunchRunCommand = []string{"crunch-run"}
+       if disp.CrunchRunCommand == nil {
+               disp.CrunchRunCommand = []string{"crunch-run"}
        }
 
-       if theConfig.PollPeriod == 0 {
-               theConfig.PollPeriod = arvados.Duration(10 * time.Second)
+       if disp.PollPeriod == 0 {
+               disp.PollPeriod = arvados.Duration(10 * time.Second)
        }
 
-       if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
+       if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
                // Copy real configs into env vars so [a]
                // MakeArvadosClient() uses them, and [b] they get
                // propagated to crunch-run via SLURM.
-               os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
-               os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
+               os.Setenv("ARVADOS_API_HOST", disp.Client.APIHost)
+               os.Setenv("ARVADOS_API_TOKEN", disp.Client.AuthToken)
                os.Setenv("ARVADOS_API_HOST_INSECURE", "")
-               if theConfig.Client.Insecure {
+               if disp.Client.Insecure {
                        os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
                }
-               os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
+               os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
                os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
        } else {
                log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
        }
 
        if *dumpConfig {
-               log.Fatal(config.DumpAndExit(theConfig))
+               return config.DumpAndExit(disp)
+       }
+
+       siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
+       if os.IsNotExist(err) {
+               log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
+       } else if err != nil {
+               return fmt.Errorf("error loading config: %s", err)
+       } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
+               return fmt.Errorf("config error: %s", err)
        }
 
+       return nil
+}
+
+// setup() initializes private fields after configure().
+func (disp *Dispatcher) setup() {
        arv, err := arvadosclient.MakeArvadosClient()
        if err != nil {
-               log.Printf("Error making Arvados client: %v", err)
-               return err
+               log.Fatalf("Error making Arvados client: %v", err)
        }
        arv.Retries = 25
 
-       sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
-       defer sqCheck.Stop()
-
-       dispatcher := &dispatch.Dispatcher{
+       disp.slurm = &slurmCLI{}
+       disp.sqCheck = &SqueueChecker{
+               Period: time.Duration(disp.PollPeriod),
+               Slurm:  disp.slurm,
+       }
+       disp.Dispatcher = &dispatch.Dispatcher{
                Arv:            arv,
-               RunContainer:   run,
-               PollPeriod:     time.Duration(theConfig.PollPeriod),
-               MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
+               RunContainer:   disp.runContainer,
+               PollPeriod:     time.Duration(disp.PollPeriod),
+               MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+       }
+}
+
+func (disp *Dispatcher) run() error {
+       defer disp.sqCheck.Stop()
+
+       if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
+               go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
        }
 
        if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
                log.Printf("Error notifying init daemon: %v", err)
        }
-
-       go checkSqueueForOrphans(dispatcher, sqCheck)
-
-       return dispatcher.Run(context.Background())
+       go disp.checkSqueueForOrphans()
+       return disp.Dispatcher.Run(context.Background())
 }
 
 var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
@@ -153,19 +182,19 @@ var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`
 // jobs started by a previous dispatch process that never released
 // their slurm allocations even though their container states are
 // Cancelled or Complete. See https://dev.arvados.org/issues/10979
-func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
-       for _, uuid := range sqCheck.All() {
+func (disp *Dispatcher) checkSqueueForOrphans() {
+       for _, uuid := range disp.sqCheck.All() {
                if !containerUuidPattern.MatchString(uuid) {
                        continue
                }
-               err := dispatcher.TrackContainer(uuid)
+               err := disp.TrackContainer(uuid)
                if err != nil {
                        log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
                }
        }
 }
 
-func niceness(priority int) int {
+func (disp *Dispatcher) niceness(priority int) int {
        if priority > 1000 {
                priority = 1000
        }
@@ -176,7 +205,7 @@ func niceness(priority int) int {
        return (1000 - priority) * 10
 }
 
-func sbatchArgs(container arvados.Container) []string {
+func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
 
        var disk int64
@@ -188,46 +217,65 @@ func sbatchArgs(container arvados.Container) []string {
        disk = int64(math.Ceil(float64(disk) / float64(1048576)))
 
        var sbatchArgs []string
-       sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
+       sbatchArgs = append(sbatchArgs, disp.SbatchArguments...)
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
-       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
 
-       return sbatchArgs
+       if disp.cluster == nil {
+               // no instance types configured
+       } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+               // ditto
+       } else if err != nil {
+               return nil, err
+       } else {
+               sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
+       }
+
+       return sbatchArgs, nil
 }
 
-func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
+func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
        // append() here avoids modifying crunchRunCommand's
        // underlying array, which is shared with other goroutines.
        crArgs := append([]string(nil), crunchRunCommand...)
        crArgs = append(crArgs, container.UUID)
        crScript := strings.NewReader(execScript(crArgs))
 
-       sqCheck.L.Lock()
-       defer sqCheck.L.Unlock()
+       disp.sqCheck.L.Lock()
+       defer disp.sqCheck.L.Unlock()
 
-       sbArgs := sbatchArgs(container)
+       sbArgs, err := disp.sbatchArgs(container)
+       if err != nil {
+               return err
+       }
        log.Printf("running sbatch %+q", sbArgs)
-       return theConfig.slurm.Batch(crScript, sbArgs)
+       return disp.slurm.Batch(crScript, sbArgs)
 }
 
 // Submit a container to the slurm queue (or resume monitoring if it's
 // already in the queue).  Cancel the slurm job if the container's
 // priority changes to zero or its state indicates it's no longer
 // running.
-func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
        ctx, cancel := context.WithCancel(context.Background())
        defer cancel()
 
-       if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+       if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
                log.Printf("Submitting container %s to slurm", ctr.UUID)
-               if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
-                       text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+               if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
+                       var text string
+                       if err == dispatchcloud.ErrConstraintsNotSatisfiable {
+                               text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
+                               disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+                       } else {
+                               text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+                       }
                        log.Print(text)
 
                        lr := arvadosclient.Dict{"log": arvadosclient.Dict{
@@ -248,7 +296,7 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
        // no point in waiting for further dispatch updates: just
        // clean up and return.
        go func(uuid string) {
-               for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+               for ctx.Err() == nil && disp.sqCheck.HasUUID(uuid) {
                }
                cancel()
        }(ctr.UUID)
@@ -269,56 +317,56 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        return
                case updated, ok := <-status:
                        if !ok {
-                               log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
-                               scancel(ctr)
+                               log.Printf("container %s is done: cancel slurm job", ctr.UUID)
+                               disp.scancel(ctr)
                        } else if updated.Priority == 0 {
-                               log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
-                               scancel(ctr)
+                               log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+                               disp.scancel(ctr)
                        } else {
-                               renice(updated)
+                               disp.renice(updated)
                        }
                }
        }
 }
 
-func scancel(ctr arvados.Container) {
-       sqCheck.L.Lock()
-       err := theConfig.slurm.Cancel(ctr.UUID)
-       sqCheck.L.Unlock()
+func (disp *Dispatcher) scancel(ctr arvados.Container) {
+       disp.sqCheck.L.Lock()
+       err := disp.slurm.Cancel(ctr.UUID)
+       disp.sqCheck.L.Unlock()
 
        if err != nil {
                log.Printf("scancel: %s", err)
                time.Sleep(time.Second)
-       } else if sqCheck.HasUUID(ctr.UUID) {
+       } else if disp.sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s is still in squeue after scancel", ctr.UUID)
                time.Sleep(time.Second)
        }
 }
 
-func renice(ctr arvados.Container) {
-       nice := niceness(ctr.Priority)
-       oldnice := sqCheck.GetNiceness(ctr.UUID)
+func (disp *Dispatcher) renice(ctr arvados.Container) {
+       nice := disp.niceness(ctr.Priority)
+       oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
        if nice == oldnice || oldnice == -1 {
                return
        }
        log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
-       sqCheck.L.Lock()
-       err := theConfig.slurm.Renice(ctr.UUID, nice)
-       sqCheck.L.Unlock()
+       disp.sqCheck.L.Lock()
+       err := disp.slurm.Renice(ctr.UUID, nice)
+       disp.sqCheck.L.Unlock()
 
        if err != nil {
                log.Printf("renice: %s", err)
                time.Sleep(time.Second)
                return
        }
-       if sqCheck.HasUUID(ctr.UUID) {
+       if disp.sqCheck.HasUUID(ctr.UUID) {
                log.Printf("container %s has arvados priority %d, slurm nice %d",
-                       ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+                       ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
        }
 }
 
-func readConfig(dst interface{}, path string) error {
-       err := config.LoadFile(dst, path)
+func (disp *Dispatcher) readConfig(path string) error {
+       err := config.LoadFile(disp, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
                log.Printf("Config not specified. Continue with default configuration.")
                err = nil
index 830976d66bdd9e3dbce855936a241b7b83ec8e7b..73d9c162e264f18372920fa4406b4f9fa88a8d26 100644 (file)
@@ -24,6 +24,7 @@ import (
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/arvadostest"
        "git.curoverse.com/arvados.git/sdk/go/dispatch"
+       "git.curoverse.com/arvados.git/services/dispatchcloud"
        . "gopkg.in/check.v1"
 )
 
@@ -32,39 +33,27 @@ func Test(t *testing.T) {
        TestingT(t)
 }
 
-var _ = Suite(&TestSuite{})
-var _ = Suite(&MockArvadosServerSuite{})
+var _ = Suite(&IntegrationSuite{})
+var _ = Suite(&StubbedSuite{})
 
-type TestSuite struct{}
-type MockArvadosServerSuite struct{}
-
-var initialArgs []string
-
-func (s *TestSuite) SetUpSuite(c *C) {
-       initialArgs = os.Args
-}
-
-func (s *TestSuite) TearDownSuite(c *C) {
+type IntegrationSuite struct {
+       disp  Dispatcher
+       slurm slurmFake
 }
 
-func (s *TestSuite) SetUpTest(c *C) {
-       args := []string{"crunch-dispatch-slurm"}
-       os.Args = args
-
+func (s *IntegrationSuite) SetUpTest(c *C) {
        arvadostest.StartAPI()
        os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
+       s.disp = Dispatcher{}
+       s.disp.setup()
+       s.slurm = slurmFake{}
 }
 
-func (s *TestSuite) TearDownTest(c *C) {
-       os.Args = initialArgs
+func (s *IntegrationSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
        arvadostest.StopAPI()
 }
 
-func (s *MockArvadosServerSuite) TearDownTest(c *C) {
-       arvadostest.ResetEnv()
-}
-
 type slurmFake struct {
        didBatch  [][]string
        didCancel []string
@@ -102,7 +91,7 @@ func (sf *slurmFake) Cancel(name string) error {
        return nil
 }
 
-func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+func (s *IntegrationSuite) integrationTest(c *C,
        expectBatch [][]string,
        runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
        arvadostest.ResetEnv()
@@ -110,11 +99,6 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
        arv, err := arvadosclient.MakeArvadosClient()
        c.Assert(err, IsNil)
 
-       defer func(orig Slurm) {
-               theConfig.slurm = orig
-       }(theConfig.slurm)
-       theConfig.slurm = slurm
-
        // There should be one queued container
        params := arvadosclient.Dict{
                "filters": [][]string{{"state", "=", "Queued"}},
@@ -124,34 +108,35 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
        c.Check(err, IsNil)
        c.Check(len(containers.Items), Equals, 1)
 
-       theConfig.CrunchRunCommand = []string{"echo"}
+       s.disp.CrunchRunCommand = []string{"echo"}
 
        ctx, cancel := context.WithCancel(context.Background())
        doneRun := make(chan struct{})
 
-       dispatcher := dispatch.Dispatcher{
+       s.disp.Dispatcher = &dispatch.Dispatcher{
                Arv:        arv,
                PollPeriod: time.Duration(1) * time.Second,
                RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
                        go func() {
                                runContainer(disp, ctr)
-                               slurm.queue = ""
+                               s.slurm.queue = ""
                                doneRun <- struct{}{}
                        }()
-                       run(disp, ctr, status)
+                       s.disp.runContainer(disp, ctr, status)
                        cancel()
                },
        }
 
-       sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
+       s.disp.slurm = &s.slurm
+       s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
 
-       err = dispatcher.Run(ctx)
+       err = s.disp.Dispatcher.Run(ctx)
        <-doneRun
        c.Assert(err, Equals, context.Canceled)
 
-       sqCheck.Stop()
+       s.disp.sqCheck.Stop()
 
-       c.Check(slurm.didBatch, DeepEquals, expectBatch)
+       c.Check(s.slurm.didBatch, DeepEquals, expectBatch)
 
        // There should be no queued containers now
        err = arv.List("containers", params, &containers)
@@ -165,9 +150,9 @@ func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
        return container
 }
 
-func (s *TestSuite) TestIntegrationNormal(c *C) {
+func (s *IntegrationSuite) TestNormal(c *C) {
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
        container := s.integrationTest(c,
-               &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -177,12 +162,11 @@ func (s *TestSuite) TestIntegrationNormal(c *C) {
        c.Check(container.State, Equals, arvados.ContainerStateComplete)
 }
 
-func (s *TestSuite) TestIntegrationCancel(c *C) {
-       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+func (s *IntegrationSuite) TestCancel(c *C) {
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
        readyToCancel := make(chan bool)
-       slurm.onCancel = func() { <-readyToCancel }
+       s.slurm.onCancel = func() { <-readyToCancel }
        container := s.integrationTest(c,
-               slurm,
                nil,
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -195,12 +179,12 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
                        close(readyToCancel)
                })
        c.Check(container.State, Equals, arvados.ContainerStateCancelled)
-       c.Check(len(slurm.didCancel) > 1, Equals, true)
-       c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
+       c.Check(len(s.slurm.didCancel) > 1, Equals, true)
+       c.Check(s.slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
 }
 
-func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
-       container := s.integrationTest(c, &slurmFake{},
+func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
+       container := s.integrationTest(c,
                [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--mem=%d", 11445),
@@ -215,9 +199,9 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
        c.Check(container.State, Equals, arvados.ContainerStateCancelled)
 }
 
-func (s *TestSuite) TestSbatchFail(c *C) {
+func (s *IntegrationSuite) TestSbatchFail(c *C) {
+       s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
        container := s.integrationTest(c,
-               &slurmFake{errBatch: errors.New("something terrible happened")},
                [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -236,15 +220,42 @@ func (s *TestSuite) TestSbatchFail(c *C) {
        c.Assert(len(ll.Items), Equals, 1)
 }
 
-func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
+func (s *IntegrationSuite) TestChangePriority(c *C) {
+       s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+       container := s.integrationTest(c, nil,
+               func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+                       dispatcher.UpdateState(container.UUID, dispatch.Running)
+                       time.Sleep(time.Second)
+                       dispatcher.Arv.Update("containers", container.UUID,
+                               arvadosclient.Dict{
+                                       "container": arvadosclient.Dict{"priority": 600}},
+                               nil)
+                       time.Sleep(time.Second)
+                       dispatcher.UpdateState(container.UUID, dispatch.Complete)
+               })
+       c.Check(container.State, Equals, arvados.ContainerStateComplete)
+       c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
+       c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+}
+
+type StubbedSuite struct {
+       disp Dispatcher
+}
+
+func (s *StubbedSuite) SetUpTest(c *C) {
+       s.disp = Dispatcher{}
+       s.disp.setup()
+}
+
+func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
        apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-       testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+       s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
 }
 
-func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
        apiStub := arvadostest.ServerStub{apiStubResponses}
 
        api := httptest.NewServer(&apiStub)
@@ -262,7 +273,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        log.SetOutput(io.MultiWriter(buf, os.Stderr))
        defer log.SetOutput(os.Stderr)
 
-       theConfig.CrunchRunCommand = []string{crunchCmd}
+       s.disp.CrunchRunCommand = []string{crunchCmd}
 
        ctx, cancel := context.WithCancel(context.Background())
        dispatcher := dispatch.Dispatcher{
@@ -274,7 +285,7 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
                                disp.UpdateState(ctr.UUID, dispatch.Running)
                                disp.UpdateState(ctr.UUID, dispatch.Complete)
                        }()
-                       run(disp, ctr, status)
+                       s.disp.runContainer(disp, ctr, status)
                        cancel()
                },
        }
@@ -292,15 +303,12 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
        c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }
 
-func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
-       var config Config
-       err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
+func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
+       err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
        c.Assert(err, NotNil)
 }
 
-func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
-       var config Config
-
+func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
        tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
        c.Check(err, IsNil)
        defer os.Remove(tmpfile.Name())
@@ -308,13 +316,11 @@ func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
        _, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
        c.Check(err, IsNil)
 
-       err = readConfig(&config, tmpfile.Name())
+       err = s.disp.readConfig(tmpfile.Name())
        c.Assert(err, NotNil)
 }
 
-func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
-       var config Config
-
+func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
        tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
        c.Check(err, IsNil)
        defer os.Remove(tmpfile.Name())
@@ -322,14 +328,12 @@ func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
        _, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
        c.Check(err, IsNil)
 
-       err = readConfig(&config, tmpfile.Name())
+       err = s.disp.readConfig(tmpfile.Name())
        c.Assert(err, IsNil)
-       c.Check(0, Equals, len(config.SbatchArguments))
+       c.Check(0, Equals, len(s.disp.SbatchArguments))
 }
 
-func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
-       var config Config
-
+func (s *StubbedSuite) TestReadConfig(c *C) {
        tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
        c.Check(err, IsNil)
        defer os.Remove(tmpfile.Name())
@@ -339,66 +343,90 @@ func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
        _, err = tmpfile.Write([]byte(argsS))
        c.Check(err, IsNil)
 
-       err = readConfig(&config, tmpfile.Name())
+       err = s.disp.readConfig(tmpfile.Name())
        c.Assert(err, IsNil)
-       c.Check(3, Equals, len(config.SbatchArguments))
-       c.Check(args, DeepEquals, config.SbatchArguments)
+       c.Check(args, DeepEquals, s.disp.SbatchArguments)
 }
 
-func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
-       testSbatchFuncWithArgs(c, nil)
-}
-
-func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
-       testSbatchFuncWithArgs(c, []string{})
-}
+func (s *StubbedSuite) TestSbatchArgs(c *C) {
+       container := arvados.Container{
+               UUID:               "123",
+               RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
+               Priority:           1,
+       }
 
-func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
-       testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
+       for _, defaults := range [][]string{
+               nil,
+               {},
+               {"--arg1=v1", "--arg2"},
+       } {
+               c.Logf("%#v", defaults)
+               s.disp.SbatchArguments = defaults
+
+               args, err := s.disp.sbatchArgs(container)
+               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+               c.Check(err, IsNil)
+       }
 }
 
-func testSbatchFuncWithArgs(c *C, args []string) {
-       defer func() { theConfig.SbatchArguments = nil }()
-       theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
-
+func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
        container := arvados.Container{
                UUID:               "123",
                RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
-               Priority:           1}
+               Priority:           1,
+       }
 
-       var expected []string
-       expected = append(expected, theConfig.SbatchArguments...)
-       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-       c.Check(sbatchArgs(container), DeepEquals, expected)
+       for _, trial := range []struct {
+               types      []arvados.InstanceType
+               sbatchArgs []string
+               err        error
+       }{
+               // Choose node type => use --constraint arg
+               {
+                       types: []arvados.InstanceType{
+                               {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+                               {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
+                               {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
+                               {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
+                       },
+                       sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
+               },
+               // No node types configured => no slurm constraint
+               {
+                       types:      nil,
+                       sbatchArgs: nil,
+               },
+               // No node type is big enough => error
+               {
+                       types: []arvados.InstanceType{
+                               {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+                       },
+                       err: dispatchcloud.ErrConstraintsNotSatisfiable,
+               },
+       } {
+               c.Logf("%#v", trial)
+               s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
+
+               args, err := s.disp.sbatchArgs(container)
+               c.Check(err, Equals, trial.err)
+               if trial.err == nil {
+                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+               }
+       }
 }
 
-func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
+func (s *StubbedSuite) TestSbatchPartition(c *C) {
        container := arvados.Container{
                UUID:                 "123",
                RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
                SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
-               Priority:             1}
+               Priority:             1,
+       }
 
-       c.Check(sbatchArgs(container), DeepEquals, []string{
+       args, err := s.disp.sbatchArgs(container)
+       c.Check(args, DeepEquals, []string{
                "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
                "--partition=blurb,b2",
        })
-}
-
-func (s *TestSuite) TestIntegrationChangePriority(c *C) {
-       slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
-       container := s.integrationTest(c, slurm, nil,
-               func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
-                       dispatcher.UpdateState(container.UUID, dispatch.Running)
-                       time.Sleep(time.Second)
-                       dispatcher.Arv.Update("containers", container.UUID,
-                               arvadosclient.Dict{
-                                       "container": arvadosclient.Dict{"priority": 600}},
-                               nil)
-                       time.Sleep(time.Second)
-                       dispatcher.UpdateState(container.UUID, dispatch.Complete)
-               })
-       c.Check(container.State, Equals, arvados.ContainerStateComplete)
-       c.Assert(len(slurm.didRenice), Not(Equals), 0)
-       c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+       c.Check(err, IsNil)
 }
index 5ecfe8ff2fc049201eb27c8e58c7a02eb84cbbb4..adb620ea8d34778f9d3c4d32edd42f518867b5a4 100644 (file)
@@ -22,6 +22,7 @@ type jobPriority struct {
 // command 'squeue'.
 type SqueueChecker struct {
        Period    time.Duration
+       Slurm     Slurm
        uuids     map[string]jobPriority
        startOnce sync.Once
        done      chan struct{}
@@ -77,7 +78,7 @@ func (sqc *SqueueChecker) check() {
        sqc.L.Lock()
        defer sqc.L.Unlock()
 
-       cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
+       cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
        stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
        cmd.Stdout, cmd.Stderr = stdout, stderr
        if err := cmd.Run(); err != nil {
diff --git a/services/dispatchcloud/gocheck_test.go b/services/dispatchcloud/gocheck_test.go
new file mode 100644 (file)
index 0000000..22f89f0
--- /dev/null
@@ -0,0 +1,16 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "testing"
+
+       check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+       check.TestingT(t)
+}
diff --git a/services/dispatchcloud/node_size.go b/services/dispatchcloud/node_size.go
new file mode 100644 (file)
index 0000000..3b72c4a
--- /dev/null
@@ -0,0 +1,120 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "bytes"
+       "errors"
+       "log"
+       "os/exec"
+       "strings"
+       "time"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+       ErrConstraintsNotSatisfiable  = errors.New("constraints not satisfiable by any configured instance type")
+       ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+       discountConfiguredRAMPercent  = 5
+)
+
+// ChooseInstanceType returns the cheapest available
+// arvados.InstanceType big enough to run ctr.
+func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
+       needVCPUs := ctr.RuntimeConstraints.VCPUs
+       needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
+
+       needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
+
+       if len(cc.InstanceTypes) == 0 {
+               err = ErrInstanceTypesNotConfigured
+               return
+       }
+
+       err = ErrConstraintsNotSatisfiable
+       for _, it := range cc.InstanceTypes {
+               switch {
+               case err == nil && it.Price > best.Price:
+               case it.RAM < needRAM:
+               case it.VCPUs < needVCPUs:
+               case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
+                       // Equal price, but worse specs
+               default:
+                       // Lower price || (same price && better specs)
+                       best = it
+                       err = nil
+               }
+       }
+       return
+}
+
+// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
+// type name as a valid feature name, even if no instances of that
+// type have appeared yet.
+//
+// It takes advantage of some SLURM peculiarities:
+//
+// (1) A feature is valid after it has been offered by a node, even if
+// it is no longer offered by any node. So, to make a feature name
+// valid, we can add it to a dummy node ("compute0"), then remove it.
+//
+// (2) when srun is given an invalid --gres argument and an invalid
+// --constraint argument, the error message mentions "Invalid feature
+// specification". So, to test whether a feature name is valid without
+// actually submitting a job, we can call srun with the feature name
+// and an invalid --gres argument.
+//
+// SlurmNodeTypeFeatureKludge does a test-and-fix operation
+// immediately, and then periodically, in case slurm restarts and
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
+func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
+       if len(cc.InstanceTypes) == 0 {
+               return
+       }
+       var features []string
+       for _, it := range cc.InstanceTypes {
+               features = append(features, "instancetype="+it.Name)
+       }
+       for {
+               slurmKludge(features)
+               time.Sleep(time.Minute)
+       }
+}
+
+var (
+       slurmDummyNode     = "compute0"
+       slurmErrBadFeature = "Invalid feature"
+       slurmErrBadGres    = "Invalid generic resource"
+)
+
+func slurmKludge(features []string) {
+       cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
+       out, err := cmd.CombinedOutput()
+       switch {
+       case err == nil:
+               log.Printf("warning: guaranteed-to-fail srun command did not fail: %q %q", cmd.Path, cmd.Args)
+               log.Printf("output was: %q", out)
+
+       case bytes.Contains(out, []byte(slurmErrBadFeature)):
+               log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
+               for _, nodeFeatures := range []string{strings.Join(features, ","), ""} {
+                       cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+nodeFeatures)
+                       log.Printf("running: %q %q", cmd.Path, cmd.Args)
+                       out, err := cmd.CombinedOutput()
+                       if err != nil {
+                               log.Printf("error: scontrol: %s (output was %q)", err, out)
+                       }
+               }
+
+       case bytes.Contains(out, []byte(slurmErrBadGres)):
+               // Evidently our node-type feature names are all valid.
+
+       default:
+               log.Printf("warning: expected srun error %q or %q, but output was %q", slurmErrBadFeature, slurmErrBadGres, out)
+       }
+}
diff --git a/services/dispatchcloud/node_size_test.go b/services/dispatchcloud/node_size_test.go
new file mode 100644 (file)
index 0000000..bc628b5
--- /dev/null
@@ -0,0 +1,73 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&NodeSizeSuite{})
+
+type NodeSizeSuite struct{}
+
+func (*NodeSizeSuite) TestChooseNotConfigured(c *check.C) {
+       _, err := ChooseInstanceType(&arvados.Cluster{}, &arvados.Container{
+               RuntimeConstraints: arvados.RuntimeConstraints{
+                       RAM:   1234567890,
+                       VCPUs: 2,
+               },
+       })
+       c.Check(err, check.Equals, ErrInstanceTypesNotConfigured)
+}
+
+func (*NodeSizeSuite) TestChooseUnsatisfiable(c *check.C) {
+       for _, rc := range []arvados.RuntimeConstraints{
+               {RAM: 9876543210, VCPUs: 2},
+               {RAM: 1234567890, VCPUs: 20},
+               {RAM: 1234567890, VCPUs: 2, KeepCacheRAM: 9876543210},
+       } {
+               _, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: []arvados.InstanceType{
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small1"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4"},
+               }}, &arvados.Container{RuntimeConstraints: rc})
+               c.Check(err, check.Equals, ErrConstraintsNotSatisfiable)
+       }
+}
+
+func (*NodeSizeSuite) TestChoose(c *check.C) {
+       for _, menu := range [][]arvados.InstanceType{
+               {
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "best"},
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+               },
+               {
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "goodenough"},
+                       {Price: 2.2, RAM: 4000000000, VCPUs: 4, Name: "best"},
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+               },
+               {
+                       {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+                       {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "goodenough"},
+                       {Price: 2.2, RAM: 4000000000, VCPUs: 4, Name: "best"},
+                       {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+               },
+       } {
+               best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+                       RuntimeConstraints: arvados.RuntimeConstraints{
+                               VCPUs:        2,
+                               RAM:          987654321,
+                               KeepCacheRAM: 123456789,
+                       },
+               })
+               c.Check(err, check.IsNil)
+               c.Check(best.Name, check.Equals, "best")
+               c.Check(best.RAM >= 1234567890, check.Equals, true)
+               c.Check(best.VCPUs >= 2, check.Equals, true)
+       }
+}
index 6c61e32b8db2c10cbef17216e8e3c80c9e7bfa4e..597a011e72075975706a42a31ed03bc74ff42b36 100644 (file)
@@ -113,14 +113,16 @@ class ComputeNodeSetupActor(ComputeNodeStateChangeBase):
     @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def create_arvados_node(self):
-        self.arvados_node = self._arvados.nodes().create(body={}).execute()
+        self.arvados_node = self._arvados.nodes().create(
+            body={}, assign_slot=True).execute()
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._finish_on_exception
     @RetryMixin._retry(config.ARVADOS_ERRORS)
     def prepare_arvados_node(self, node):
-        self.arvados_node = self._clean_arvados_node(
-            node, "Prepared by Node Manager")
+        self._clean_arvados_node(node, "Prepared by Node Manager")
+        self.arvados_node = self._arvados.nodes().update(
+            body={}, assign_slot=True).execute()
         self._later.create_cloud_node()
 
     @ComputeNodeStateChangeBase._finish_on_exception
@@ -315,7 +317,8 @@ class ComputeNodeUpdateActor(config.actor_class, RetryMixin):
 
     @RetryMixin._retry()
     def sync_node(self, cloud_node, arvados_node):
-        return self._cloud.sync_node(cloud_node, arvados_node)
+        if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+            return self._cloud.sync_node(cloud_node, arvados_node)
 
 
 class ComputeNodeMonitorActor(config.actor_class):
@@ -326,14 +329,13 @@ class ComputeNodeMonitorActor(config.actor_class):
     for shutdown.
     """
     def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
-                 cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+                 timer_actor, update_actor, cloud_client,
                  arvados_node=None, poll_stale_after=600, node_stale_after=3600,
                  boot_fail_after=1800
     ):
         super(ComputeNodeMonitorActor, self).__init__()
         self._later = self.actor_ref.tell_proxy()
         self._shutdowns = shutdown_timer
-        self._cloud_node_fqdn = cloud_fqdn_func
         self._timer = timer_actor
         self._update = update_actor
         self._cloud = cloud_client
@@ -486,8 +488,11 @@ class ComputeNodeMonitorActor(config.actor_class):
             self._later.consider_shutdown()
 
     def update_arvados_node(self, arvados_node):
-        # If the cloud node's FQDN doesn't match what's in the Arvados node
-        # record, make them match.
+        """Called when the latest Arvados node record is retrieved.
+
+        Calls the updater's sync_node() method.
+
+        """
         # This method is a little unusual in the way it just fires off the
         # request without checking the result or retrying errors.  That's
         # because this update happens every time we reload the Arvados node
@@ -496,7 +501,5 @@ class ComputeNodeMonitorActor(config.actor_class):
         # the logic to throttle those effective retries when there's trouble.
         if arvados_node is not None:
             self.arvados_node = arvados_node
-            if (self._cloud_node_fqdn(self.cloud_node) !=
-                  arvados_node_fqdn(self.arvados_node)):
-                self._update.sync_node(self.cloud_node, self.arvados_node)
+            self._update.sync_node(self.cloud_node, self.arvados_node)
             self._later.consider_shutdown()
index c8883c3ae70f6614b7bd9063030c14e264dfe543..1cf8f4e41d776e5861c41816aff34cf2d98604db 100644 (file)
@@ -8,8 +8,8 @@ from __future__ import absolute_import, print_function
 import subprocess
 import time
 
-from . import \
-    ComputeNodeSetupActor, ComputeNodeMonitorActor
+from . import ComputeNodeMonitorActor
+from . import ComputeNodeSetupActor as SetupActorBase
 from . import ComputeNodeShutdownActor as ShutdownActorBase
 from . import ComputeNodeUpdateActor as UpdateActorBase
 from .. import RetryMixin
@@ -20,16 +20,32 @@ class SlurmMixin(object):
                                   'fail\n', 'fail*\n'])
     SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
 
-    def _set_node_state(self, nodename, state, *args):
-        cmd = ['scontrol', 'update', 'NodeName=' + nodename,
-               'State=' + state]
-        cmd.extend(args)
-        subprocess.check_output(cmd)
+    def _update_slurm_node(self, nodename, updates):
+        cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
+        try:
+            subprocess.check_output(cmd)
+        except:
+            self._logger.error(
+                "SLURM update %r failed", cmd, exc_info=True)
+
+    def _update_slurm_size_attrs(self, nodename, size):
+        self._update_slurm_node(nodename, [
+            'Weight=%i' % int(size.price * 1000),
+            'Features=instancetype=' + size.id,
+        ])
 
     def _get_slurm_state(self, nodename):
         return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
 
 
+class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
+    def create_cloud_node(self):
+        hostname = self.arvados_node.get("hostname")
+        if hostname:
+            self._update_slurm_size_attrs(hostname, self.cloud_size)
+        return super(ComputeNodeSetupActor, self).create_cloud_node()
+
+
 class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
     def on_start(self):
         arv_node = self._arvados_node()
@@ -47,7 +63,7 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if self._nodename:
             if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
                 # Resume from "drng" or "drain"
-                self._set_node_state(self._nodename, 'RESUME')
+                self._update_slurm_node(self._nodename, ['State=RESUME'])
             else:
                 # Node is in a state such as 'idle' or 'alloc' so don't
                 # try to resume it because that will just raise an error.
@@ -59,7 +75,8 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
         if self.cancel_reason is not None:
             return
         if self._nodename:
-            self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+            self._update_slurm_node(self._nodename, [
+                'State=DRAIN', 'Reason=Node Manager shutdown'])
             self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
             self._later.await_slurm_drain()
         else:
@@ -82,15 +99,20 @@ class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
 
     def _destroy_node(self):
         if self._nodename:
-            self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+            self._update_slurm_node(self._nodename, [
+                'State=DOWN', 'Reason=Node Manager shutdown'])
         super(ComputeNodeShutdownActor, self)._destroy_node()
 
 
-class ComputeNodeUpdateActor(UpdateActorBase):
+class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
     def sync_node(self, cloud_node, arvados_node):
-        if arvados_node.get("hostname"):
-            try:
-                subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
-            except:
-                self._logger.error("Unable to set slurm node weight.", exc_info=True)
-        return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)
+        """Keep SLURM's node properties up to date."""
+        hostname = arvados_node.get("hostname")
+        features = arvados_node.get("slurm_node_features", "").split(",")
+        sizefeature = "instancetype=" + cloud_node.size.id
+        if hostname and sizefeature not in features:
+            # This probably means SLURM has restarted and lost our
+            # dynamically configured node weights and features.
+            self._update_slurm_size_attrs(hostname, cloud_node.size)
+        return super(ComputeNodeUpdateActor, self).sync_node(
+            cloud_node, arvados_node)
index 419557fe288ded9c4c4706bcd47a58b035ce2e65..3f1d575361a461f322e6475fab28b059d973e193 100644 (file)
@@ -38,7 +38,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         super(ComputeNodeDriver, self).__init__(
             auth_kwargs, list_kwargs, create_kwargs,
             driver_class)
-        self._sizes_by_name = {sz.name: sz for sz in self.sizes.itervalues()}
+        self._sizes_by_id = {sz.id: sz for sz in self.sizes.itervalues()}
         self._disktype_links = {dt.name: self._object_link(dt)
                                 for dt in self.real.ex_list_disktypes()}
 
@@ -120,7 +120,7 @@ class ComputeNodeDriver(BaseComputeNodeDriver):
         # and monkeypatch the results when that's the case.
         if nodelist and not hasattr(nodelist[0].size, 'id'):
             for node in nodelist:
-                node.size = self._sizes_by_name[node.size]
+                node.size = self._sizes_by_id[node.size]
         return nodelist
 
     @classmethod
index dd441edb6b70f1df4c8144e818bdd0501c443ffc..73b58bfe65fc0cc871579a22500424d2ec2f87fc 100644 (file)
@@ -167,7 +167,6 @@ class NodeManagerDaemonActor(actor_class):
             cloud_node=cloud_node,
             cloud_node_start_time=start_time,
             shutdown_timer=shutdown_timer,
-            cloud_fqdn_func=self._cloud_driver.node_fqdn,
             update_actor=self._cloud_updater,
             timer_actor=self._timer,
             arvados_node=None,
index 4d2d3e0c0ace3e6ff9db5832d3f8a9dcc4b7ad9a..0360bfc5424913c2f85bafc52d2ceaf29bd41296 100644 (file)
@@ -6,6 +6,7 @@
 from __future__ import absolute_import, print_function
 
 import logging
+import re
 import subprocess
 
 import arvados.util
@@ -74,13 +75,15 @@ class ServerCalculator(object):
             return fallback
 
     def cloud_size_for_constraints(self, constraints):
+        specified_size = constraints.get('instance_type')
         want_value = lambda key: self.coerce_int(constraints.get(key), 0)
         wants = {'cores': want_value('min_cores_per_node'),
                  'ram': want_value('min_ram_mb_per_node'),
                  'scratch': want_value('min_scratch_mb_per_node')}
         for size in self.cloud_sizes:
-            if size.meets_constraints(**wants):
-                return size
+            if (size.meets_constraints(**wants) and
+                (specified_size is None or size.id == specified_size)):
+                    return size
         return None
 
     def servers_for_queue(self, queue):
@@ -92,8 +95,7 @@ class ServerCalculator(object):
             cloud_size = self.cloud_size_for_constraints(constraints)
             if cloud_size is None:
                 unsatisfiable_jobs[job['uuid']] = (
-                    'Requirements for a single node exceed the available '
-                    'cloud node size')
+                    "Constraints cannot be satisfied by any node type")
             elif (want_count > self.max_nodes):
                 unsatisfiable_jobs[job['uuid']] = (
                     "Job's min_nodes constraint is greater than the configured "
@@ -152,21 +154,43 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
         queuelist = []
         if self.slurm_queue:
             # cpus, memory, tempory disk space, reason, job name
-            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+            squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
             for out in squeue_out.splitlines():
                 try:
-                    cpu, ram, disk, reason, jobname = out.split("|", 4)
-                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
-                        queuelist.append({
-                            "uuid": jobname,
-                            "runtime_constraints": {
-                                "min_cores_per_node": cpu,
-                                "min_ram_mb_per_node": self.coerce_to_mb(ram),
-                                "min_scratch_mb_per_node": self.coerce_to_mb(disk)
-                            }
-                        })
+                    cpu, ram, disk, reason, jobname, features = out.split("|", 5)
                 except ValueError:
-                    pass
+                    self._logger.warning("ignored malformed line in squeue output: %r", out)
+                    continue
+                if '-dz642-' not in jobname:
+                    continue
+                if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason):
+                    continue
+
+                for feature in features.split(','):
+                    m = re.match(r'instancetype=(.*)', feature)
+                    if not m:
+                        continue
+                    instance_type = m.group(1)
+                    # Ignore cpu/ram/scratch requirements, bring up
+                    # the requested node type.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "instance_type": instance_type,
+                        }
+                    })
+                    break
+                else:
+                    # No instance type specified. Choose a node type
+                    # to suit cpu/ram/scratch requirements.
+                    queuelist.append({
+                        "uuid": jobname,
+                        "runtime_constraints": {
+                            "min_cores_per_node": cpu,
+                            "min_ram_mb_per_node": self.coerce_to_mb(ram),
+                            "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+                        }
+                    })
 
         if self.jobs_queue:
             queuelist.extend(self._client.jobs().queue().execute()['items'])
index 70ad54d789cff1e34e4f39beb759939b7b2bdf3d..4b9d5b60fb0ce5131d865f4b3d97b0652afb88c8 100644 (file)
@@ -15,8 +15,9 @@ import arvados.util
 class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
     """Actor to poll the Arvados node list.
 
-    This actor regularly polls the list of Arvados node records, and
-    sends it to subscribers.
+    This actor regularly polls the list of Arvados node records,
+    augments it with the latest SLURM node info (`sinfo`), and sends
+    it to subscribers.
     """
 
     def is_common_error(self, exception):
@@ -29,28 +30,32 @@ class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
         nodelist = arvados.util.list_all(self._client.nodes().list)
 
         # node hostname, state
-        sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
+        sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n|%t|%f"])
         nodestates = {}
+        nodefeatures = {}
         for out in sinfo_out.splitlines():
             try:
-                nodename, state = out.split(" ", 2)
-                if state in ('alloc', 'alloc*',
-                             'comp',  'comp*',
-                             'mix',   'mix*',
-                             'drng',  'drng*'):
-                    nodestates[nodename] = 'busy'
-                elif state in ('idle', 'fail'):
-                    nodestates[nodename] = state
-                else:
-                    nodestates[nodename] = 'down'
+                nodename, state, features = out.split("|", 3)
             except ValueError:
-                pass
+                continue
+            if state in ('alloc', 'alloc*',
+                         'comp',  'comp*',
+                         'mix',   'mix*',
+                         'drng',  'drng*'):
+                nodestates[nodename] = 'busy'
+            elif state in ('idle', 'fail'):
+                nodestates[nodename] = state
+            else:
+                nodestates[nodename] = 'down'
+            if features != "(null)":
+                nodefeatures[nodename] = features
 
         for n in nodelist:
             if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
                 n["crunch_worker_state"] = nodestates[n["hostname"]]
             else:
                 n["crunch_worker_state"] = 'down'
+            n["slurm_node_features"] = nodefeatures.get(n["hostname"], "")
 
         return nodelist
 
index d5b55540f8e66eea3749d1828d3203ae89521f92..7b8ba391c9ced07de6a39d7b7952695d619b126e 100755 (executable)
@@ -25,9 +25,13 @@ from functools import partial
 import arvados
 import StringIO
 
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
 logger = logging.getLogger("logger")
 logger.setLevel(logging.INFO)
-logger.addHandler(logging.StreamHandler(sys.stderr))
+logger.addHandler(handler)
 
 detail = logging.getLogger("detail")
 detail.setLevel(logging.INFO)
@@ -35,7 +39,9 @@ if os.environ.get("ANMTEST_LOGLEVEL"):
     detail_content = sys.stderr
 else:
     detail_content = StringIO.StringIO()
-detail.addHandler(logging.StreamHandler(detail_content))
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
 
 fake_slurm = None
 compute_nodes = None
@@ -52,14 +58,14 @@ def update_script(path, val):
 def set_squeue(g):
     global all_jobs
     update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
-                  "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+                  "\n".join("echo '1|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
     return 0
 
 def set_queue_unsatisfiable(g):
     global all_jobs, unsatisfiable_job_scancelled
     # Simulate a job requesting a 99 core node.
     update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
-                  "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+                  "\n".join("echo '99|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
     update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
                   "\ntouch %s" % unsatisfiable_job_scancelled)
     return 0
@@ -78,7 +84,7 @@ def job_cancelled(g):
             ['event_type', '=', 'stderr'],
         ]).execute()['items'][0]
     if not re.match(
-            r"Requirements for a single node exceed the available cloud node size",
+            r"Constraints cannot be satisfied",
             log_entry['properties']['text']):
         return 1
     return 0
@@ -88,7 +94,7 @@ def node_paired(g):
     compute_nodes[g.group(1)] = g.group(3)
 
     update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
-                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+                  "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
 
     for k,v in all_jobs.items():
         if v == "ReqNodeNotAvail":
@@ -101,7 +107,7 @@ def node_paired(g):
 
 def remaining_jobs(g):
     update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
-                  "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+                  "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
 
     for k,v in all_jobs.items():
         all_jobs[k] = "Running"
@@ -113,7 +119,7 @@ def remaining_jobs(g):
 
 def node_busy(g):
     update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
-                  "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+                  "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
     return 0
 
 def node_shutdown(g):
index 4b352059e629bbc22c667347812d7046960c0da7..0a2deb8a9cdd70ca7a72f1ef41b067bbe2f00ea4 100644 (file)
@@ -21,9 +21,16 @@ from arvnodeman.computenode.driver import BaseComputeNodeDriver
 from . import testutil
 
 class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+    ACTOR_CLASS = dispatch.ComputeNodeSetupActor
+
     def make_mocks(self, arvados_effect=None):
         if arvados_effect is None:
-            arvados_effect = [testutil.arvados_node_mock()]
+            arvados_effect = [testutil.arvados_node_mock(
+                slot_number=None,
+                hostname=None,
+                first_ping_at=None,
+                last_ping_at=None,
+            )]
         self.arvados_effect = arvados_effect
         self.timer = testutil.MockTimer()
         self.api_client = mock.MagicMock(name='api_client')
@@ -35,7 +42,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
     def make_actor(self, arv_node=None):
         if not hasattr(self, 'timer'):
             self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
-        self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+        self.setup_actor = self.ACTOR_CLASS.start(
             self.timer, self.api_client, self.cloud_client,
             testutil.MockSize(1), arv_node).proxy()
 
@@ -56,6 +63,7 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
         self.assertEqual(self.arvados_effect[-1],
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
         assert(finished.wait(self.TIMEOUT))
+        self.api_client.nodes().create.called_with(body={}, assign_slot=True)
         self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
         self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
         self.assert_node_properties_updated()
@@ -71,7 +79,8 @@ class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
                          self.setup_actor.arvados_node.get(self.TIMEOUT))
         assert(finished.wait(self.TIMEOUT))
         self.assert_node_properties_updated()
-        self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
+        self.api_client.nodes().create.called_with(body={}, assign_slot=True)
+        self.assertEqual(3, self.api_client.nodes().update().execute.call_count)
         self.assertEqual(self.cloud_client.create_node(),
                          self.setup_actor.cloud_node.get(self.TIMEOUT))
 
@@ -188,7 +197,7 @@ class ComputeNodeShutdownActorMixin(testutil.ActorTestMixin):
             start_time = time.time()
         monitor_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_node, start_time, self.shutdowns,
-            testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+            self.timer, self.updates, self.cloud_client,
             self.arvados_node)
         self.shutdown_actor = self.ACTOR_CLASS.start(
             self.timer, self.cloud_client, self.arvados_client, monitor_actor,
@@ -326,7 +335,7 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
             start_time = time.time()
         self.node_actor = dispatch.ComputeNodeMonitorActor.start(
             self.cloud_mock, start_time, self.shutdowns,
-            testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+            self.timer, self.updates, self.cloud_client,
             arv_node, boot_fail_after=300).proxy()
         self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
 
@@ -511,19 +520,10 @@ class ComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
         self.assertEqual(testutil.ip_address_mock(4),
                          current_arvados['ip_address'])
 
-    def test_update_arvados_node_syncs_when_fqdn_mismatch(self):
+    def test_update_arvados_node_calls_sync_node(self):
         self.make_mocks(5)
         self.cloud_mock.extra['testname'] = 'cloudfqdn.zzzzz.arvadosapi.com'
         self.make_actor()
         arv_node = testutil.arvados_node_mock(5)
         self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
         self.assertEqual(1, self.updates.sync_node.call_count)
-
-    def test_update_arvados_node_skips_sync_when_fqdn_match(self):
-        self.make_mocks(6)
-        arv_node = testutil.arvados_node_mock(6)
-        self.cloud_mock.extra['testname'] ='{n[hostname]}.{n[domain]}'.format(
-            n=arv_node)
-        self.make_actor()
-        self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
-        self.assertEqual(0, self.updates.sync_node.call_count)
index 0b6162dfaa64405df53794bb575bfffd2420bbff..b61db5cba1c57918c622d1ee815461ba6fe6de77 100644 (file)
@@ -13,7 +13,10 @@ import mock
 
 import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
 from . import testutil
-from .test_computenode_dispatch import ComputeNodeShutdownActorMixin, ComputeNodeUpdateActorTestCase
+from .test_computenode_dispatch import \
+    ComputeNodeShutdownActorMixin, \
+    ComputeNodeSetupActorTestCase, \
+    ComputeNodeUpdateActorTestCase
 
 @mock.patch('subprocess.check_output')
 class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
@@ -123,4 +126,18 @@ class SLURMComputeNodeUpdateActorTestCase(ComputeNodeUpdateActorTestCase):
         cloud_node = testutil.cloud_node_mock()
         arv_node = testutil.arvados_node_mock()
         self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
-        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000'])
+        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000', 'Features=instancetype=z99.test'])
+
+class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
+    ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
+
+    @mock.patch('subprocess.check_output')
+    def test_update_node_features(self, check_output):
+        # `scontrol update` happens only if the Arvados node record
+        # has a hostname. ComputeNodeSetupActorTestCase.make_mocks
+        # uses mocks with scrubbed hostnames, so we override with the
+        # default testutil.arvados_node_mock.
+        self.make_mocks(arvados_effect=[testutil.arvados_node_mock()])
+        self.make_actor()
+        self.wait_for_assignment(self.setup_actor, 'cloud_node')
+        check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
index cfc4add63b20992b452fbc59ecb02c02e8ebdaf5..f0942e93785571f8ae4e3cdb7f0c78eb173ee7b6 100644 (file)
@@ -211,7 +211,7 @@ class GCEComputeNodeDriverTestCase(testutil.DriverTestMixin, unittest.TestCase):
         # patches that up in listings.
         size = testutil.MockSize(2)
         node = testutil.cloud_node_mock(size=size)
-        node.size = size.name
+        node.size = size.id
         self.driver_mock().list_sizes.return_value = [size]
         self.driver_mock().list_nodes.return_value = [node]
         driver = self.new_driver()
index ebe7408e705b02e2d55b2d757ef5367953f23242..50fa0aa68a4fb26c58cdb6978594805e314a7e31 100644 (file)
@@ -23,12 +23,13 @@ class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
                                      unittest.TestCase):
 
     def busywait(self, f):
-        n = 0
-        while not f() and n < 200:
+        for n in xrange(200):
+            ok = f()
+            if ok:
+                return
             time.sleep(.1)
             self.daemon.ping().get(self.TIMEOUT)
-            n += 1
-        self.assertTrue(f())
+        self.assertTrue(ok) # always falsy, but not necessarily False
 
     def mock_node_start(self, **kwargs):
         # Make sure that every time the daemon starts a setup actor,
index b1d5e002767a000d7487aa82c8ee5bb9c312e320..52232453bd0872cf88da74a7823ae0ee4e951724 100644 (file)
@@ -159,7 +159,7 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
     def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
         job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
         container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
-        mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+        mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
 
         self.build_monitor([{'items': [{'uuid': job_uuid}]}],
                            self.MockCalculatorUnsatisfiableJobs(), True, True)
@@ -181,8 +181,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list(self, mock_squeue):
-        mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -195,8 +195,8 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_squeue_server_list_suffix(self, mock_squeue):
-        mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+        mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
 """
 
         super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@@ -207,6 +207,16 @@ class JobQueueMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
         self.subscriber.assert_called_with([testutil.MockSize(1),
                                             testutil.MockSize(2)])
 
+    @mock.patch("subprocess.check_output")
+    def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
+        mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+        super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+            [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+                                                                True, True)
+        self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+        self.stop_proxy(self.monitor)
+        self.subscriber.assert_called_with([testutil.MockSize(2)])
+
     def test_coerce_to_mb(self):
         self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
         self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))
index 11f41b8d9ab68458fae21aa6b86685630d8d1a3f..5becd0c2241386e34b6dfef8e57a29b025335a67 100644 (file)
@@ -42,10 +42,15 @@ class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
 
     @mock.patch("subprocess.check_output")
     def test_update_from_sinfo(self, sinfo_mock):
-        sinfo_mock.return_value = "compute99 alloc"
-        node = testutil.arvados_node_mock()
+        sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
+compute2|alloc|(null)
+notarvados12345|idle|(null)
+"""
+        nodeIdle = testutil.arvados_node_mock(node_num=1)
+        nodeBusy = testutil.arvados_node_mock(node_num=2)
+        nodeMissing = testutil.arvados_node_mock(node_num=99)
         self.build_monitor([{
-            'items': [node],
+            'items': [nodeIdle, nodeBusy, nodeMissing],
             'items_available': 1,
             'offset': 0
         }, {
@@ -53,11 +58,18 @@ class ArvadosNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
             'items_available': 1,
             'offset': 1
         }])
-        self.monitor.subscribe_to(node['uuid'],
+        self.monitor.subscribe_to(nodeMissing['uuid'],
                                   self.subscriber).get(self.TIMEOUT)
         self.stop_proxy(self.monitor)
-        self.subscriber.assert_called_with(node)
-        self.assertEqual("busy", node["crunch_worker_state"])
+        self.subscriber.assert_called_with(nodeMissing)
+
+        self.assertEqual("idle", nodeIdle["crunch_worker_state"])
+        self.assertEqual("busy", nodeBusy["crunch_worker_state"])
+        self.assertEqual("down", nodeMissing["crunch_worker_state"])
+
+        self.assertEqual("instancetype=a1.test", nodeIdle["slurm_node_features"])
+        self.assertEqual("", nodeBusy["slurm_node_features"])
+        self.assertEqual("", nodeMissing["slurm_node_features"])
 
 
 class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
index 6e134375bb8aec05bdd71f830e28f277d3cff5b5..555144c4d05d2bc562d9bc2357fa93421f64b35f 100644 (file)
@@ -55,7 +55,7 @@ def cloud_object_mock(name_id, **extra):
 def cloud_node_fqdn(node):
     # We intentionally put the FQDN somewhere goofy to make sure tested code is
     # using this function for lookups.
-    return node.extra.get('testname', 'NoTestName')
+    return node.extra.get('testname', node.name+'.NoTestName.invalid')
 
 def ip_address_mock(last_octet):
     return '10.20.30.{}'.format(last_octet)
@@ -80,7 +80,7 @@ class MockShutdownTimer(object):
 class MockSize(object):
     def __init__(self, factor):
         self.id = 'z{}.test'.format(factor)
-        self.name = self.id
+        self.name = 'test size '+self.id
         self.ram = 128 * factor
         self.disk = factor   # GB
         self.scratch = 1000 * factor # MB