services/api
services/arv-git-httpd
services/crunchstat
+services/dispatchcloud
services/dockercleaner
services/fuse
services/health
sdk/go/stats
services/arv-git-httpd
services/crunchstat
+ services/dispatchcloud
services/health
services/keep-web
services/keepstore
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package arvados
import (
ClusterID string `json:"-"`
ManagementToken string
SystemNodes map[string]SystemNode
+ InstanceTypes []InstanceType
+}
+
+type InstanceType struct {
+ Name string
+ ProviderType string
+ VCPUs int
+ RAM int64
+ Scratch int64
+ Price float64
}
// GetThisSystemNode returns a SystemNode for the node we're running
// CPU) and network connectivity.
type RuntimeConstraints struct {
API *bool
- RAM int `json:"ram"`
- VCPUs int `json:"vcpus"`
- KeepCacheRAM int `json:"keep_cache_ram"`
+ RAM int64 `json:"ram"`
+ VCPUs int `json:"vcpus"`
+ KeepCacheRAM int64 `json:"keep_cache_ram"`
}
// SchedulingParameters specify a container's scheduling parameters
}
end
+ def self._update_requires_parameters
+ {}
+ end
+
def self._index_requires_parameters
{
filters: { type: 'array', required: false },
{ ping_secret: {required: true} }
end
+ def self._create_requires_parameters
+ super.merge(
+ { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+ end
+
+ def self._update_requires_parameters
+ super.merge(
+ { assign_slot: {required: false, type: 'boolean', description: 'assign slot and hostname'} })
+ end
+
+ def create
+ @object = model_class.new(resource_attrs)
+ @object.assign_slot if params[:assign_slot]
+ @object.save!
+ show
+ end
+
+ def update
+ attrs_to_update = resource_attrs.reject { |k,v|
+ [:kind, :etag, :href].index k
+ }
+ @object.update_attributes!(attrs_to_update)
+ @object.assign_slot if params[:assign_slot]
+ show
+ end
+
def ping
act_as_system_user do
@object = Node.where(uuid: (params[:id] || params[:uuid])).first
end
end
- # Assign slot_number
- if self.slot_number.nil?
- while true
- n = self.class.available_slot_number
- if n.nil?
- raise "No available node slots"
- end
- self.slot_number = n
- begin
- self.save!
- break
- rescue ActiveRecord::RecordNotUnique
- # try again
- end
- end
- end
-
- # Assign hostname
- if self.hostname.nil? and Rails.configuration.assign_node_hostname
- self.hostname = self.class.hostname_for_slot(self.slot_number)
- end
+ assign_slot
# Record other basic stats
['total_cpu_cores', 'total_ram_mb', 'total_scratch_mb'].each do |key|
save!
end
+ def assign_slot
+ return if self.slot_number.andand > 0
+ while true
+ self.slot_number = self.class.available_slot_number
+ if self.slot_number.nil?
+ raise "No available node slots"
+ end
+ begin
+ save!
+ return assign_hostname
+ rescue ActiveRecord::RecordNotUnique
+ # try again
+ end
+ end
+ end
+
protected
+ def assign_hostname
+ if self.hostname.nil? and Rails.configuration.assign_node_hostname
+ self.hostname = self.class.hostname_for_slot(self.slot_number)
+ end
+ end
+
def self.available_slot_number
# Join the sequence 1..max with the nodes table. Return the first
# (i.e., smallest) value that doesn't match the slot_number of any
assert_not_nil json_response['uuid']
assert_not_nil json_response['info'].is_a? Hash
assert_not_nil json_response['info']['ping_secret']
+ assert_nil json_response['slot_number']
+ assert_nil json_response['hostname']
+ end
+
+ test "create node and assign slot" do
+ authorize_with :admin
+ post :create, {node: {}, assign_slot: true}
+ assert_response :success
+ assert_not_nil json_response['uuid']
+ assert_not_nil json_response['info'].is_a? Hash
+ assert_not_nil json_response['info']['ping_secret']
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "compute#{n}", json_response['hostname']
+ end
+
+ test "update node and assign slot" do
+ authorize_with :admin
+ node = nodes(:new_with_no_hostname)
+ post :update, {id: node.uuid, node: {}, assign_slot: true}
+ assert_response :success
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "compute#{n}", json_response['hostname']
+ end
+
+ test "update node and assign slot, don't clobber hostname" do
+ authorize_with :admin
+ node = nodes(:new_with_custom_hostname)
+ post :update, {id: node.uuid, node: {}, assign_slot: true}
+ assert_response :success
+ assert_operator 0, :<, json_response['slot_number']
+ n = json_response['slot_number']
+ assert_equal "custom1", json_response['hostname']
end
test "ping adds node stats to info" do
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "git.curoverse.com/arvados.git/services/dispatchcloud"
"github.com/coreos/go-systemd/daemon"
)
-var version = "dev"
+var (
+ version = "dev"
+ defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+)
+
+type Dispatcher struct {
+ *dispatch.Dispatcher
+ cluster *arvados.Cluster
+ sqCheck *SqueueChecker
+ slurm Slurm
-// Config used by crunch-dispatch-slurm
-type Config struct {
Client arvados.Client
SbatchArguments []string
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
-
- slurm Slurm
}
func main() {
- theConfig.slurm = &slurmCLI{}
- err := doMain()
+ disp := &Dispatcher{}
+ err := disp.Run(os.Args[0], os.Args[1:])
if err != nil {
log.Fatal(err)
}
}
-var (
- theConfig Config
- sqCheck = &SqueueChecker{}
-)
-
-const defaultConfigPath = "/etc/arvados/crunch-dispatch-slurm/crunch-dispatch-slurm.yml"
+func (disp *Dispatcher) Run(prog string, args []string) error {
+ if err := disp.configure(prog, args); err != nil {
+ return err
+ }
+ disp.setup()
+ return disp.run()
+}
-func doMain() error {
- flags := flag.NewFlagSet("crunch-dispatch-slurm", flag.ExitOnError)
+// configure() loads config files. Tests skip this.
+func (disp *Dispatcher) configure(prog string, args []string) error {
+ flags := flag.NewFlagSet(prog, flag.ExitOnError)
flags.Usage = func() { usage(flags) }
configPath := flags.String(
false,
"Print version information and exit.")
// Parse args; omit the first arg which is the command name
- flags.Parse(os.Args[1:])
+ flags.Parse(args)
// Print version information if requested
if *getVersion {
log.Printf("crunch-dispatch-slurm %s started", version)
- err := readConfig(&theConfig, *configPath)
+ err := disp.readConfig(*configPath)
if err != nil {
return err
}
- if theConfig.CrunchRunCommand == nil {
- theConfig.CrunchRunCommand = []string{"crunch-run"}
+ if disp.CrunchRunCommand == nil {
+ disp.CrunchRunCommand = []string{"crunch-run"}
}
- if theConfig.PollPeriod == 0 {
- theConfig.PollPeriod = arvados.Duration(10 * time.Second)
+ if disp.PollPeriod == 0 {
+ disp.PollPeriod = arvados.Duration(10 * time.Second)
}
- if theConfig.Client.APIHost != "" || theConfig.Client.AuthToken != "" {
+ if disp.Client.APIHost != "" || disp.Client.AuthToken != "" {
// Copy real configs into env vars so [a]
// MakeArvadosClient() uses them, and [b] they get
// propagated to crunch-run via SLURM.
- os.Setenv("ARVADOS_API_HOST", theConfig.Client.APIHost)
- os.Setenv("ARVADOS_API_TOKEN", theConfig.Client.AuthToken)
+ os.Setenv("ARVADOS_API_HOST", disp.Client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", disp.Client.AuthToken)
os.Setenv("ARVADOS_API_HOST_INSECURE", "")
- if theConfig.Client.Insecure {
+ if disp.Client.Insecure {
os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
}
- os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(theConfig.Client.KeepServiceURIs, " "))
+ os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
}
if *dumpConfig {
- log.Fatal(config.DumpAndExit(theConfig))
+ return config.DumpAndExit(disp)
+ }
+
+ siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
+ if os.IsNotExist(err) {
+ log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
+ } else if err != nil {
+ return fmt.Errorf("error loading config: %s", err)
+ } else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
+ return fmt.Errorf("config error: %s", err)
}
+ return nil
+}
+
+// setup() initializes private fields after configure().
+func (disp *Dispatcher) setup() {
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Printf("Error making Arvados client: %v", err)
- return err
+ log.Fatalf("Error making Arvados client: %v", err)
}
arv.Retries = 25
- sqCheck = &SqueueChecker{Period: time.Duration(theConfig.PollPeriod)}
- defer sqCheck.Stop()
-
- dispatcher := &dispatch.Dispatcher{
+ disp.slurm = &slurmCLI{}
+ disp.sqCheck = &SqueueChecker{
+ Period: time.Duration(disp.PollPeriod),
+ Slurm: disp.slurm,
+ }
+ disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
- RunContainer: run,
- PollPeriod: time.Duration(theConfig.PollPeriod),
- MinRetryPeriod: time.Duration(theConfig.MinRetryPeriod),
+ RunContainer: disp.runContainer,
+ PollPeriod: time.Duration(disp.PollPeriod),
+ MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
+ }
+}
+
+func (disp *Dispatcher) run() error {
+ defer disp.sqCheck.Stop()
+
+ if disp.cluster != nil && len(disp.cluster.InstanceTypes) > 0 {
+ go dispatchcloud.SlurmNodeTypeFeatureKludge(disp.cluster)
}
if _, err := daemon.SdNotify(false, "READY=1"); err != nil {
log.Printf("Error notifying init daemon: %v", err)
}
-
- go checkSqueueForOrphans(dispatcher, sqCheck)
-
- return dispatcher.Run(context.Background())
+ go disp.checkSqueueForOrphans()
+ return disp.Dispatcher.Run(context.Background())
}
var containerUuidPattern = regexp.MustCompile(`^[a-z0-9]{5}-dz642-[a-z0-9]{15}$`)
// jobs started by a previous dispatch process that never released
// their slurm allocations even though their container states are
// Cancelled or Complete. See https://dev.arvados.org/issues/10979
-func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueChecker) {
- for _, uuid := range sqCheck.All() {
+func (disp *Dispatcher) checkSqueueForOrphans() {
+ for _, uuid := range disp.sqCheck.All() {
if !containerUuidPattern.MatchString(uuid) {
continue
}
- err := dispatcher.TrackContainer(uuid)
+ err := disp.TrackContainer(uuid)
if err != nil {
log.Printf("checkSqueueForOrphans: TrackContainer(%s): %s", uuid, err)
}
}
}
-func niceness(priority int) int {
+func (disp *Dispatcher) niceness(priority int) int {
if priority > 1000 {
priority = 1000
}
return (1000 - priority) * 10
}
-func sbatchArgs(container arvados.Container) []string {
+func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
var disk int64
disk = int64(math.Ceil(float64(disk) / float64(1048576)))
var sbatchArgs []string
- sbatchArgs = append(sbatchArgs, theConfig.SbatchArguments...)
+ sbatchArgs = append(sbatchArgs, disp.SbatchArguments...)
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--job-name=%s", container.UUID))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
- sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
+ sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", disp.niceness(container.Priority)))
if len(container.SchedulingParameters.Partitions) > 0 {
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- return sbatchArgs
+ if disp.cluster == nil {
+ // no instance types configured
+ } else if it, err := dispatchcloud.ChooseInstanceType(disp.cluster, &container); err == dispatchcloud.ErrInstanceTypesNotConfigured {
+ // ditto
+ } else if err != nil {
+ return nil, err
+ } else {
+ sbatchArgs = append(sbatchArgs, "--constraint=instancetype="+it.Name)
+ }
+
+ return sbatchArgs, nil
}
-func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
+func (disp *Dispatcher) submit(container arvados.Container, crunchRunCommand []string) error {
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
crArgs := append([]string(nil), crunchRunCommand...)
crArgs = append(crArgs, container.UUID)
crScript := strings.NewReader(execScript(crArgs))
- sqCheck.L.Lock()
- defer sqCheck.L.Unlock()
+ disp.sqCheck.L.Lock()
+ defer disp.sqCheck.L.Unlock()
- sbArgs := sbatchArgs(container)
+ sbArgs, err := disp.sbatchArgs(container)
+ if err != nil {
+ return err
+ }
log.Printf("running sbatch %+q", sbArgs)
- return theConfig.slurm.Batch(crScript, sbArgs)
+ return disp.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
// already in the queue). Cancel the slurm job if the container's
// priority changes to zero or its state indicates it's no longer
// running.
-func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- if ctr.State == dispatch.Locked && !sqCheck.HasUUID(ctr.UUID) {
+ if ctr.State == dispatch.Locked && !disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("Submitting container %s to slurm", ctr.UUID)
- if err := submit(disp, ctr, theConfig.CrunchRunCommand); err != nil {
- text := fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
+ var text string
+ if err == dispatchcloud.ErrConstraintsNotSatisfiable {
+ text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
+ disp.UpdateState(ctr.UUID, dispatch.Cancelled)
+ } else {
+ text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
+ }
log.Print(text)
lr := arvadosclient.Dict{"log": arvadosclient.Dict{
// no point in waiting for further dispatch updates: just
// clean up and return.
go func(uuid string) {
- for ctx.Err() == nil && sqCheck.HasUUID(uuid) {
+ for ctx.Err() == nil && disp.sqCheck.HasUUID(uuid) {
}
cancel()
}(ctr.UUID)
return
case updated, ok := <-status:
if !ok {
- log.Printf("Dispatcher says container %s is done: cancel slurm job", ctr.UUID)
- scancel(ctr)
+ log.Printf("container %s is done: cancel slurm job", ctr.UUID)
+ disp.scancel(ctr)
} else if updated.Priority == 0 {
- log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
- scancel(ctr)
+ log.Printf("container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
+ disp.scancel(ctr)
} else {
- renice(updated)
+ disp.renice(updated)
}
}
}
}
-func scancel(ctr arvados.Container) {
- sqCheck.L.Lock()
- err := theConfig.slurm.Cancel(ctr.UUID)
- sqCheck.L.Unlock()
+func (disp *Dispatcher) scancel(ctr arvados.Container) {
+ disp.sqCheck.L.Lock()
+ err := disp.slurm.Cancel(ctr.UUID)
+ disp.sqCheck.L.Unlock()
if err != nil {
log.Printf("scancel: %s", err)
time.Sleep(time.Second)
- } else if sqCheck.HasUUID(ctr.UUID) {
+ } else if disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
time.Sleep(time.Second)
}
}
-func renice(ctr arvados.Container) {
- nice := niceness(ctr.Priority)
- oldnice := sqCheck.GetNiceness(ctr.UUID)
+func (disp *Dispatcher) renice(ctr arvados.Container) {
+ nice := disp.niceness(ctr.Priority)
+ oldnice := disp.sqCheck.GetNiceness(ctr.UUID)
if nice == oldnice || oldnice == -1 {
return
}
log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
- sqCheck.L.Lock()
- err := theConfig.slurm.Renice(ctr.UUID, nice)
- sqCheck.L.Unlock()
+ disp.sqCheck.L.Lock()
+ err := disp.slurm.Renice(ctr.UUID, nice)
+ disp.sqCheck.L.Unlock()
if err != nil {
log.Printf("renice: %s", err)
time.Sleep(time.Second)
return
}
- if sqCheck.HasUUID(ctr.UUID) {
+ if disp.sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s has arvados priority %d, slurm nice %d",
- ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+ ctr.UUID, ctr.Priority, disp.sqCheck.GetNiceness(ctr.UUID))
}
}
-func readConfig(dst interface{}, path string) error {
- err := config.LoadFile(dst, path)
+func (disp *Dispatcher) readConfig(path string) error {
+ err := config.LoadFile(disp, path)
if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
log.Printf("Config not specified. Continue with default configuration.")
err = nil
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "git.curoverse.com/arvados.git/services/dispatchcloud"
. "gopkg.in/check.v1"
)
TestingT(t)
}
-var _ = Suite(&TestSuite{})
-var _ = Suite(&MockArvadosServerSuite{})
+var _ = Suite(&IntegrationSuite{})
+var _ = Suite(&StubbedSuite{})
-type TestSuite struct{}
-type MockArvadosServerSuite struct{}
-
-var initialArgs []string
-
-func (s *TestSuite) SetUpSuite(c *C) {
- initialArgs = os.Args
-}
-
-func (s *TestSuite) TearDownSuite(c *C) {
+type IntegrationSuite struct {
+ disp Dispatcher
+ slurm slurmFake
}
-func (s *TestSuite) SetUpTest(c *C) {
- args := []string{"crunch-dispatch-slurm"}
- os.Args = args
-
+func (s *IntegrationSuite) SetUpTest(c *C) {
arvadostest.StartAPI()
os.Setenv("ARVADOS_API_TOKEN", arvadostest.Dispatch1Token)
+ s.disp = Dispatcher{}
+ s.disp.setup()
+ s.slurm = slurmFake{}
}
-func (s *TestSuite) TearDownTest(c *C) {
- os.Args = initialArgs
+func (s *IntegrationSuite) TearDownTest(c *C) {
arvadostest.ResetEnv()
arvadostest.StopAPI()
}
-func (s *MockArvadosServerSuite) TearDownTest(c *C) {
- arvadostest.ResetEnv()
-}
-
type slurmFake struct {
didBatch [][]string
didCancel []string
return nil
}
-func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+func (s *IntegrationSuite) integrationTest(c *C,
expectBatch [][]string,
runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
arvadostest.ResetEnv()
arv, err := arvadosclient.MakeArvadosClient()
c.Assert(err, IsNil)
- defer func(orig Slurm) {
- theConfig.slurm = orig
- }(theConfig.slurm)
- theConfig.slurm = slurm
-
// There should be one queued container
params := arvadosclient.Dict{
"filters": [][]string{{"state", "=", "Queued"}},
c.Check(err, IsNil)
c.Check(len(containers.Items), Equals, 1)
- theConfig.CrunchRunCommand = []string{"echo"}
+ s.disp.CrunchRunCommand = []string{"echo"}
ctx, cancel := context.WithCancel(context.Background())
doneRun := make(chan struct{})
- dispatcher := dispatch.Dispatcher{
+ s.disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
PollPeriod: time.Duration(1) * time.Second,
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
runContainer(disp, ctr)
- slurm.queue = ""
+ s.slurm.queue = ""
doneRun <- struct{}{}
}()
- run(disp, ctr, status)
+ s.disp.runContainer(disp, ctr, status)
cancel()
},
}
- sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
+ s.disp.slurm = &s.slurm
+ s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
- err = dispatcher.Run(ctx)
+ err = s.disp.Dispatcher.Run(ctx)
<-doneRun
c.Assert(err, Equals, context.Canceled)
- sqCheck.Stop()
+ s.disp.sqCheck.Stop()
- c.Check(slurm.didBatch, DeepEquals, expectBatch)
+ c.Check(s.slurm.didBatch, DeepEquals, expectBatch)
// There should be no queued containers now
err = arv.List("containers", params, &containers)
return container
}
-func (s *TestSuite) TestIntegrationNormal(c *C) {
+func (s *IntegrationSuite) TestNormal(c *C) {
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
container := s.integrationTest(c,
- &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
-func (s *TestSuite) TestIntegrationCancel(c *C) {
- slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+func (s *IntegrationSuite) TestCancel(c *C) {
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
readyToCancel := make(chan bool)
- slurm.onCancel = func() { <-readyToCancel }
+ s.slurm.onCancel = func() { <-readyToCancel }
container := s.integrationTest(c,
- slurm,
nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
close(readyToCancel)
})
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
- c.Check(len(slurm.didCancel) > 1, Equals, true)
- c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
+ c.Check(len(s.slurm.didCancel) > 1, Equals, true)
+ c.Check(s.slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
}
-func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- container := s.integrationTest(c, &slurmFake{},
+func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
+ container := s.integrationTest(c,
[][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--mem=%d", 11445),
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
}
-func (s *TestSuite) TestSbatchFail(c *C) {
+func (s *IntegrationSuite) TestSbatchFail(c *C) {
+ s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
container := s.integrationTest(c,
- &slurmFake{errBatch: errors.New("something terrible happened")},
[][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
c.Assert(len(ll.Items), Equals, 1)
}
-func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
+func (s *IntegrationSuite) TestChangePriority(c *C) {
+ s.slurm = slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ container := s.integrationTest(c, nil,
+ func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+ dispatcher.UpdateState(container.UUID, dispatch.Running)
+ time.Sleep(time.Second)
+ dispatcher.Arv.Update("containers", container.UUID,
+ arvadosclient.Dict{
+ "container": arvadosclient.Dict{"priority": 600}},
+ nil)
+ time.Sleep(time.Second)
+ dispatcher.UpdateState(container.UUID, dispatch.Complete)
+ })
+ c.Check(container.State, Equals, arvados.ContainerStateComplete)
+ c.Assert(len(s.slurm.didRenice), Not(Equals), 0)
+ c.Check(s.slurm.didRenice[len(s.slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+}
+
+type StubbedSuite struct {
+ disp Dispatcher
+}
+
+func (s *StubbedSuite) SetUpTest(c *C) {
+ s.disp = Dispatcher{}
+ s.disp.setup()
+}
+
+func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
apiStubResponses := make(map[string]arvadostest.StubResponse)
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+ s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
}
-func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
+func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
apiStub := arvadostest.ServerStub{apiStubResponses}
api := httptest.NewServer(&apiStub)
log.SetOutput(io.MultiWriter(buf, os.Stderr))
defer log.SetOutput(os.Stderr)
- theConfig.CrunchRunCommand = []string{crunchCmd}
+ s.disp.CrunchRunCommand = []string{crunchCmd}
ctx, cancel := context.WithCancel(context.Background())
dispatcher := dispatch.Dispatcher{
disp.UpdateState(ctr.UUID, dispatch.Running)
disp.UpdateState(ctr.UUID, dispatch.Complete)
}()
- run(disp, ctr, status)
+ s.disp.runContainer(disp, ctr, status)
cancel()
},
}
c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
-func (s *MockArvadosServerSuite) TestNoSuchConfigFile(c *C) {
- var config Config
- err := readConfig(&config, "/nosuchdir89j7879/8hjwr7ojgyy7")
+func (s *StubbedSuite) TestNoSuchConfigFile(c *C) {
+ err := s.disp.readConfig("/nosuchdir89j7879/8hjwr7ojgyy7")
c.Assert(err, NotNil)
}
-func (s *MockArvadosServerSuite) TestBadSbatchArgsConfig(c *C) {
- var config Config
-
+func (s *StubbedSuite) TestBadSbatchArgsConfig(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.Write([]byte(`{"SbatchArguments": "oops this is not a string array"}`))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, NotNil)
}
-func (s *MockArvadosServerSuite) TestNoSuchArgInConfigIgnored(c *C) {
- var config Config
-
+func (s *StubbedSuite) TestNoSuchArgInConfigIgnored(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.Write([]byte(`{"NoSuchArg": "Nobody loves me, not one tiny hunk."}`))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(0, Equals, len(config.SbatchArguments))
+ c.Check(0, Equals, len(s.disp.SbatchArguments))
}
-func (s *MockArvadosServerSuite) TestReadConfig(c *C) {
- var config Config
-
+func (s *StubbedSuite) TestReadConfig(c *C) {
tmpfile, err := ioutil.TempFile(os.TempDir(), "config")
c.Check(err, IsNil)
defer os.Remove(tmpfile.Name())
_, err = tmpfile.Write([]byte(argsS))
c.Check(err, IsNil)
- err = readConfig(&config, tmpfile.Name())
+ err = s.disp.readConfig(tmpfile.Name())
c.Assert(err, IsNil)
- c.Check(3, Equals, len(config.SbatchArguments))
- c.Check(args, DeepEquals, config.SbatchArguments)
+ c.Check(args, DeepEquals, s.disp.SbatchArguments)
}
-func (s *MockArvadosServerSuite) TestSbatchFuncWithNoConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, nil)
-}
-
-func (s *MockArvadosServerSuite) TestSbatchFuncWithEmptyConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, []string{})
-}
+func (s *StubbedSuite) TestSbatchArgs(c *C) {
+ container := arvados.Container{
+ UUID: "123",
+ RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
+ Priority: 1,
+ }
-func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
- testSbatchFuncWithArgs(c, []string{"--arg1=v1", "--arg2"})
+ for _, defaults := range [][]string{
+ nil,
+ {},
+ {"--arg1=v1", "--arg2"},
+ } {
+ c.Logf("%#v", defaults)
+ s.disp.SbatchArguments = defaults
+
+ args, err := s.disp.sbatchArgs(container)
+ c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"))
+ c.Check(err, IsNil)
+ }
}
-func testSbatchFuncWithArgs(c *C, args []string) {
- defer func() { theConfig.SbatchArguments = nil }()
- theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
-
+func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
- Priority: 1}
+ Priority: 1,
+ }
- var expected []string
- expected = append(expected, theConfig.SbatchArguments...)
- expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
- c.Check(sbatchArgs(container), DeepEquals, expected)
+ for _, trial := range []struct {
+ types []arvados.InstanceType
+ sbatchArgs []string
+ err error
+ }{
+ // Choose node type => use --constraint arg
+ {
+ types: []arvados.InstanceType{
+ {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ {Name: "a1.small", Price: 0.04, RAM: 256000000, VCPUs: 2},
+ {Name: "a1.medium", Price: 0.08, RAM: 512000000, VCPUs: 4},
+ {Name: "a1.large", Price: 0.16, RAM: 1024000000, VCPUs: 8},
+ },
+ sbatchArgs: []string{"--constraint=instancetype=a1.medium"},
+ },
+ // No node types configured => no slurm constraint
+ {
+ types: nil,
+ sbatchArgs: nil,
+ },
+ // No node type is big enough => error
+ {
+ types: []arvados.InstanceType{
+ {Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
+ },
+ err: dispatchcloud.ErrConstraintsNotSatisfiable,
+ },
+ } {
+ c.Logf("%#v", trial)
+ s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
+
+ args, err := s.disp.sbatchArgs(container)
+ c.Check(err, Equals, trial.err)
+ if trial.err == nil {
+ c.Check(args, DeepEquals, append([]string{"--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990"}, trial.sbatchArgs...))
+ }
+ }
}
-func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
+func (s *StubbedSuite) TestSbatchPartition(c *C) {
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
- Priority: 1}
+ Priority: 1,
+ }
- c.Check(sbatchArgs(container), DeepEquals, []string{
+ args, err := s.disp.sbatchArgs(container)
+ c.Check(args, DeepEquals, []string{
"--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
"--partition=blurb,b2",
})
-}
-
-func (s *TestSuite) TestIntegrationChangePriority(c *C) {
- slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
- container := s.integrationTest(c, slurm, nil,
- func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
- dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(time.Second)
- dispatcher.Arv.Update("containers", container.UUID,
- arvadosclient.Dict{
- "container": arvadosclient.Dict{"priority": 600}},
- nil)
- time.Sleep(time.Second)
- dispatcher.UpdateState(container.UUID, dispatch.Complete)
- })
- c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Assert(len(slurm.didRenice), Not(Equals), 0)
- c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
+ c.Check(err, IsNil)
}
// command 'squeue'.
type SqueueChecker struct {
Period time.Duration
+ Slurm Slurm
uuids map[string]jobPriority
startOnce sync.Once
done chan struct{}
sqc.L.Lock()
defer sqc.L.Unlock()
- cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
+ cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "testing"
+
+ check "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ check.TestingT(t)
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "bytes"
+ "errors"
+ "log"
+ "os/exec"
+ "strings"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+var (
+ ErrConstraintsNotSatisfiable = errors.New("constraints not satisfiable by any configured instance type")
+ ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
+ discountConfiguredRAMPercent = 5
+)
+
+// ChooseInstanceType returns the cheapest available
+// arvados.InstanceType big enough to run ctr.
+func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
+ needVCPUs := ctr.RuntimeConstraints.VCPUs
+ needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
+
+ needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
+
+ if len(cc.InstanceTypes) == 0 {
+ err = ErrInstanceTypesNotConfigured
+ return
+ }
+
+ err = ErrConstraintsNotSatisfiable
+ for _, it := range cc.InstanceTypes {
+ switch {
+ case err == nil && it.Price > best.Price:
+ case it.RAM < needRAM:
+ case it.VCPUs < needVCPUs:
+ case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
+ // Equal price, but worse specs
+ default:
+ // Lower price || (same price && better specs)
+ best = it
+ err = nil
+ }
+ }
+ return
+}
+
+// SlurmNodeTypeFeatureKludge ensures SLURM accepts every instance
+// type name as a valid feature name, even if no instances of that
+// type have appeared yet.
+//
+// It takes advantage of some SLURM peculiarities:
+//
+// (1) A feature is valid after it has been offered by a node, even if
+// it is no longer offered by any node. So, to make a feature name
+// valid, we can add it to a dummy node ("compute0"), then remove it.
+//
+// (2) when srun is given an invalid --gres argument and an invalid
+// --constraint argument, the error message mentions "Invalid feature
+// specification". So, to test whether a feature name is valid without
+// actually submitting a job, we can call srun with the feature name
+// and an invalid --gres argument.
+//
+// SlurmNodeTypeFeatureKludge does a test-and-fix operation
+// immediately, and then periodically, in case slurm restarts and
+// forgets the list of valid features. It never returns (unless there
+// are no node types configured, in which case it returns
+// immediately), so it should generally be invoked with "go".
+func SlurmNodeTypeFeatureKludge(cc *arvados.Cluster) {
+ if len(cc.InstanceTypes) == 0 {
+ return
+ }
+ var features []string
+ for _, it := range cc.InstanceTypes {
+ features = append(features, "instancetype="+it.Name)
+ }
+ for {
+ slurmKludge(features)
+ time.Sleep(time.Minute)
+ }
+}
+
+var (
+ slurmDummyNode = "compute0"
+ slurmErrBadFeature = "Invalid feature"
+ slurmErrBadGres = "Invalid generic resource"
+)
+
+func slurmKludge(features []string) {
+ cmd := exec.Command("srun", "--gres=invalid-gres-specification", "--constraint="+strings.Join(features, "&"), "true")
+ out, err := cmd.CombinedOutput()
+ switch {
+ case err == nil:
+ log.Printf("warning: guaranteed-to-fail srun command did not fail: %q %q", cmd.Path, cmd.Args)
+ log.Printf("output was: %q", out)
+
+ case bytes.Contains(out, []byte(slurmErrBadFeature)):
+ log.Printf("temporarily configuring node %q with all node type features", slurmDummyNode)
+ for _, nodeFeatures := range []string{strings.Join(features, ","), ""} {
+ cmd = exec.Command("scontrol", "update", "NodeName="+slurmDummyNode, "Features="+nodeFeatures)
+ log.Printf("running: %q %q", cmd.Path, cmd.Args)
+ out, err := cmd.CombinedOutput()
+ if err != nil {
+ log.Printf("error: scontrol: %s (output was %q)", err, out)
+ }
+ }
+
+ case bytes.Contains(out, []byte(slurmErrBadGres)):
+ // Evidently our node-type feature names are all valid.
+
+ default:
+ log.Printf("warning: expected srun error %q or %q, but output was %q", slurmErrBadFeature, slurmErrBadGres, out)
+ }
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package dispatchcloud
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&NodeSizeSuite{})
+
+type NodeSizeSuite struct{}
+
+func (*NodeSizeSuite) TestChooseNotConfigured(c *check.C) {
+ _, err := ChooseInstanceType(&arvados.Cluster{}, &arvados.Container{
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ RAM: 1234567890,
+ VCPUs: 2,
+ },
+ })
+ c.Check(err, check.Equals, ErrInstanceTypesNotConfigured)
+}
+
+func (*NodeSizeSuite) TestChooseUnsatisfiable(c *check.C) {
+ for _, rc := range []arvados.RuntimeConstraints{
+ {RAM: 9876543210, VCPUs: 2},
+ {RAM: 1234567890, VCPUs: 20},
+ {RAM: 1234567890, VCPUs: 2, KeepCacheRAM: 9876543210},
+ } {
+ _, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: []arvados.InstanceType{
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small1"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4"},
+ }}, &arvados.Container{RuntimeConstraints: rc})
+ c.Check(err, check.Equals, ErrConstraintsNotSatisfiable)
+ }
+}
+
+func (*NodeSizeSuite) TestChoose(c *check.C) {
+ for _, menu := range [][]arvados.InstanceType{
+ {
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+ },
+ {
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "goodenough"},
+ {Price: 2.2, RAM: 4000000000, VCPUs: 4, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+ },
+ {
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Name: "small"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "goodenough"},
+ {Price: 2.2, RAM: 4000000000, VCPUs: 4, Name: "best"},
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "costly"},
+ },
+ } {
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 987654321,
+ KeepCacheRAM: 123456789,
+ },
+ })
+ c.Check(err, check.IsNil)
+ c.Check(best.Name, check.Equals, "best")
+ c.Check(best.RAM >= 1234567890, check.Equals, true)
+ c.Check(best.VCPUs >= 2, check.Equals, true)
+ }
+}
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def create_arvados_node(self):
- self.arvados_node = self._arvados.nodes().create(body={}).execute()
+ self.arvados_node = self._arvados.nodes().create(
+ body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry(config.ARVADOS_ERRORS)
def prepare_arvados_node(self, node):
- self.arvados_node = self._clean_arvados_node(
- node, "Prepared by Node Manager")
+ self._clean_arvados_node(node, "Prepared by Node Manager")
+ self.arvados_node = self._arvados.nodes().update(
+ body={}, assign_slot=True).execute()
self._later.create_cloud_node()
@ComputeNodeStateChangeBase._finish_on_exception
@RetryMixin._retry()
def sync_node(self, cloud_node, arvados_node):
- return self._cloud.sync_node(cloud_node, arvados_node)
+ if self._cloud.node_fqdn(cloud_node) != arvados_node_fqdn(arvados_node):
+ return self._cloud.sync_node(cloud_node, arvados_node)
class ComputeNodeMonitorActor(config.actor_class):
for shutdown.
"""
def __init__(self, cloud_node, cloud_node_start_time, shutdown_timer,
- cloud_fqdn_func, timer_actor, update_actor, cloud_client,
+ timer_actor, update_actor, cloud_client,
arvados_node=None, poll_stale_after=600, node_stale_after=3600,
boot_fail_after=1800
):
super(ComputeNodeMonitorActor, self).__init__()
self._later = self.actor_ref.tell_proxy()
self._shutdowns = shutdown_timer
- self._cloud_node_fqdn = cloud_fqdn_func
self._timer = timer_actor
self._update = update_actor
self._cloud = cloud_client
self._later.consider_shutdown()
def update_arvados_node(self, arvados_node):
- # If the cloud node's FQDN doesn't match what's in the Arvados node
- # record, make them match.
+ """Called when the latest Arvados node record is retrieved.
+
+ Calls the updater's sync_node() method.
+
+ """
# This method is a little unusual in the way it just fires off the
# request without checking the result or retrying errors. That's
# because this update happens every time we reload the Arvados node
# the logic to throttle those effective retries when there's trouble.
if arvados_node is not None:
self.arvados_node = arvados_node
- if (self._cloud_node_fqdn(self.cloud_node) !=
- arvados_node_fqdn(self.arvados_node)):
- self._update.sync_node(self.cloud_node, self.arvados_node)
+ self._update.sync_node(self.cloud_node, self.arvados_node)
self._later.consider_shutdown()
import subprocess
import time
-from . import \
- ComputeNodeSetupActor, ComputeNodeMonitorActor
+from . import ComputeNodeMonitorActor
+from . import ComputeNodeSetupActor as SetupActorBase
from . import ComputeNodeShutdownActor as ShutdownActorBase
from . import ComputeNodeUpdateActor as UpdateActorBase
from .. import RetryMixin
'fail\n', 'fail*\n'])
SLURM_DRAIN_STATES = frozenset(['drain\n', 'drng\n'])
- def _set_node_state(self, nodename, state, *args):
- cmd = ['scontrol', 'update', 'NodeName=' + nodename,
- 'State=' + state]
- cmd.extend(args)
- subprocess.check_output(cmd)
+ def _update_slurm_node(self, nodename, updates):
+ cmd = ['scontrol', 'update', 'NodeName=' + nodename] + updates
+ try:
+ subprocess.check_output(cmd)
+ except:
+ self._logger.error(
+ "SLURM update %r failed", cmd, exc_info=True)
+
+ def _update_slurm_size_attrs(self, nodename, size):
+ self._update_slurm_node(nodename, [
+ 'Weight=%i' % int(size.price * 1000),
+ 'Features=instancetype=' + size.id,
+ ])
def _get_slurm_state(self, nodename):
return subprocess.check_output(['sinfo', '--noheader', '-o', '%t', '-n', nodename])
+class ComputeNodeSetupActor(SlurmMixin, SetupActorBase):
+ def create_cloud_node(self):
+ hostname = self.arvados_node.get("hostname")
+ if hostname:
+ self._update_slurm_size_attrs(hostname, self.cloud_size)
+ return super(ComputeNodeSetupActor, self).create_cloud_node()
+
+
class ComputeNodeShutdownActor(SlurmMixin, ShutdownActorBase):
def on_start(self):
arv_node = self._arvados_node()
if self._nodename:
if try_resume and self._get_slurm_state(self._nodename) in self.SLURM_DRAIN_STATES:
# Resume from "drng" or "drain"
- self._set_node_state(self._nodename, 'RESUME')
+ self._update_slurm_node(self._nodename, ['State=RESUME'])
else:
# Node is in a state such as 'idle' or 'alloc' so don't
# try to resume it because that will just raise an error.
if self.cancel_reason is not None:
return
if self._nodename:
- self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DRAIN', 'Reason=Node Manager shutdown'])
self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
self._later.await_slurm_drain()
else:
def _destroy_node(self):
if self._nodename:
- self._set_node_state(self._nodename, 'DOWN', 'Reason=Node Manager shutdown')
+ self._update_slurm_node(self._nodename, [
+ 'State=DOWN', 'Reason=Node Manager shutdown'])
super(ComputeNodeShutdownActor, self)._destroy_node()
-class ComputeNodeUpdateActor(UpdateActorBase):
+class ComputeNodeUpdateActor(SlurmMixin, UpdateActorBase):
def sync_node(self, cloud_node, arvados_node):
- if arvados_node.get("hostname"):
- try:
- subprocess.check_output(['scontrol', 'update', 'NodeName=' + arvados_node["hostname"], 'Weight=%i' % int(cloud_node.size.price * 1000)])
- except:
- self._logger.error("Unable to set slurm node weight.", exc_info=True)
- return super(ComputeNodeUpdateActor, self).sync_node(cloud_node, arvados_node)
+ """Keep SLURM's node properties up to date."""
+ hostname = arvados_node.get("hostname")
+ features = arvados_node.get("slurm_node_features", "").split(",")
+ sizefeature = "instancetype=" + cloud_node.size.id
+ if hostname and sizefeature not in features:
+ # This probably means SLURM has restarted and lost our
+ # dynamically configured node weights and features.
+ self._update_slurm_size_attrs(hostname, cloud_node.size)
+ return super(ComputeNodeUpdateActor, self).sync_node(
+ cloud_node, arvados_node)
super(ComputeNodeDriver, self).__init__(
auth_kwargs, list_kwargs, create_kwargs,
driver_class)
- self._sizes_by_name = {sz.name: sz for sz in self.sizes.itervalues()}
+ self._sizes_by_id = {sz.id: sz for sz in self.sizes.itervalues()}
self._disktype_links = {dt.name: self._object_link(dt)
for dt in self.real.ex_list_disktypes()}
# and monkeypatch the results when that's the case.
if nodelist and not hasattr(nodelist[0].size, 'id'):
for node in nodelist:
- node.size = self._sizes_by_name[node.size]
+ node.size = self._sizes_by_id[node.size]
return nodelist
@classmethod
cloud_node=cloud_node,
cloud_node_start_time=start_time,
shutdown_timer=shutdown_timer,
- cloud_fqdn_func=self._cloud_driver.node_fqdn,
update_actor=self._cloud_updater,
timer_actor=self._timer,
arvados_node=None,
from __future__ import absolute_import, print_function
import logging
+import re
import subprocess
import arvados.util
return fallback
def cloud_size_for_constraints(self, constraints):
+ specified_size = constraints.get('instance_type')
want_value = lambda key: self.coerce_int(constraints.get(key), 0)
wants = {'cores': want_value('min_cores_per_node'),
'ram': want_value('min_ram_mb_per_node'),
'scratch': want_value('min_scratch_mb_per_node')}
for size in self.cloud_sizes:
- if size.meets_constraints(**wants):
- return size
+ if (size.meets_constraints(**wants) and
+ (specified_size is None or size.id == specified_size)):
+ return size
return None
def servers_for_queue(self, queue):
cloud_size = self.cloud_size_for_constraints(constraints)
if cloud_size is None:
unsatisfiable_jobs[job['uuid']] = (
- 'Requirements for a single node exceed the available '
- 'cloud node size')
+ "Constraints cannot be satisfied by any node type")
elif (want_count > self.max_nodes):
unsatisfiable_jobs[job['uuid']] = (
"Job's min_nodes constraint is greater than the configured "
queuelist = []
if self.slurm_queue:
# cpus, memory, tempory disk space, reason, job name
- squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j"])
+ squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
for out in squeue_out.splitlines():
try:
- cpu, ram, disk, reason, jobname = out.split("|", 4)
- if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
- queuelist.append({
- "uuid": jobname,
- "runtime_constraints": {
- "min_cores_per_node": cpu,
- "min_ram_mb_per_node": self.coerce_to_mb(ram),
- "min_scratch_mb_per_node": self.coerce_to_mb(disk)
- }
- })
+ cpu, ram, disk, reason, jobname, features = out.split("|", 5)
except ValueError:
- pass
+ self._logger.warning("ignored malformed line in squeue output: %r", out)
+ continue
+ if '-dz642-' not in jobname:
+ continue
+ if not re.search(r'ReqNodeNotAvail|Resources|Priority', reason):
+ continue
+
+ for feature in features.split(','):
+ m = re.match(r'instancetype=(.*)', feature)
+ if not m:
+ continue
+ instance_type = m.group(1)
+ # Ignore cpu/ram/scratch requirements, bring up
+ # the requested node type.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "instance_type": instance_type,
+ }
+ })
+ break
+ else:
+ # No instance type specified. Choose a node type
+ # to suit cpu/ram/scratch requirements.
+ queuelist.append({
+ "uuid": jobname,
+ "runtime_constraints": {
+ "min_cores_per_node": cpu,
+ "min_ram_mb_per_node": self.coerce_to_mb(ram),
+ "min_scratch_mb_per_node": self.coerce_to_mb(disk)
+ }
+ })
if self.jobs_queue:
queuelist.extend(self._client.jobs().queue().execute()['items'])
class ArvadosNodeListMonitorActor(clientactor.RemotePollLoopActor):
"""Actor to poll the Arvados node list.
- This actor regularly polls the list of Arvados node records, and
- sends it to subscribers.
+ This actor regularly polls the list of Arvados node records,
+ augments it with the latest SLURM node info (`sinfo`), and sends
+ it to subscribers.
"""
def is_common_error(self, exception):
nodelist = arvados.util.list_all(self._client.nodes().list)
# node hostname, state
- sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n %t"])
+ sinfo_out = subprocess.check_output(["sinfo", "--noheader", "--format=%n|%t|%f"])
nodestates = {}
+ nodefeatures = {}
for out in sinfo_out.splitlines():
try:
- nodename, state = out.split(" ", 2)
- if state in ('alloc', 'alloc*',
- 'comp', 'comp*',
- 'mix', 'mix*',
- 'drng', 'drng*'):
- nodestates[nodename] = 'busy'
- elif state in ('idle', 'fail'):
- nodestates[nodename] = state
- else:
- nodestates[nodename] = 'down'
+ nodename, state, features = out.split("|", 3)
except ValueError:
- pass
+ continue
+ if state in ('alloc', 'alloc*',
+ 'comp', 'comp*',
+ 'mix', 'mix*',
+ 'drng', 'drng*'):
+ nodestates[nodename] = 'busy'
+ elif state in ('idle', 'fail'):
+ nodestates[nodename] = state
+ else:
+ nodestates[nodename] = 'down'
+ if features != "(null)":
+ nodefeatures[nodename] = features
for n in nodelist:
if n["slot_number"] and n["hostname"] and n["hostname"] in nodestates:
n["crunch_worker_state"] = nodestates[n["hostname"]]
else:
n["crunch_worker_state"] = 'down'
+ n["slurm_node_features"] = nodefeatures.get(n["hostname"], "")
return nodelist
import arvados
import StringIO
+formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
+
+handler = logging.StreamHandler(sys.stderr)
+handler.setFormatter(formatter)
logger = logging.getLogger("logger")
logger.setLevel(logging.INFO)
-logger.addHandler(logging.StreamHandler(sys.stderr))
+logger.addHandler(handler)
detail = logging.getLogger("detail")
detail.setLevel(logging.INFO)
detail_content = sys.stderr
else:
detail_content = StringIO.StringIO()
-detail.addHandler(logging.StreamHandler(detail_content))
+handler = logging.StreamHandler(detail_content)
+handler.setFormatter(formatter)
+detail.addHandler(handler)
fake_slurm = None
compute_nodes = None
def set_squeue(g):
global all_jobs
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '1|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '1|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
return 0
def set_queue_unsatisfiable(g):
global all_jobs, unsatisfiable_job_scancelled
# Simulate a job requesting a 99 core node.
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '99|100|100|%s|%s'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '99|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
"\ntouch %s" % unsatisfiable_job_scancelled)
return 0
['event_type', '=', 'stderr'],
]).execute()['items'][0]
if not re.match(
- r"Requirements for a single node exceed the available cloud node size",
+ r"Constraints cannot be satisfied",
log_entry['properties']['text']):
return 1
return 0
compute_nodes[g.group(1)] = g.group(3)
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
for k,v in all_jobs.items():
if v == "ReqNodeNotAvail":
def remaining_jobs(g):
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s alloc'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
for k,v in all_jobs.items():
all_jobs[k] = "Running"
def node_busy(g):
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s idle'" % (v) for k,v in compute_nodes.items()))
+ "\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
return 0
def node_shutdown(g):
from . import testutil
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
+ ACTOR_CLASS = dispatch.ComputeNodeSetupActor
+
def make_mocks(self, arvados_effect=None):
if arvados_effect is None:
- arvados_effect = [testutil.arvados_node_mock()]
+ arvados_effect = [testutil.arvados_node_mock(
+ slot_number=None,
+ hostname=None,
+ first_ping_at=None,
+ last_ping_at=None,
+ )]
self.arvados_effect = arvados_effect
self.timer = testutil.MockTimer()
self.api_client = mock.MagicMock(name='api_client')
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
self.make_mocks(arvados_effect=[arv_node] if arv_node else None)
- self.setup_actor = dispatch.ComputeNodeSetupActor.start(
+ self.setup_actor = self.ACTOR_CLASS.start(
self.timer, self.api_client, self.cloud_client,
testutil.MockSize(1), arv_node).proxy()
self.assertEqual(self.arvados_effect[-1],
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
self.assertEqual(1, self.api_client.nodes().create().execute.call_count)
self.assertEqual(1, self.api_client.nodes().update().execute.call_count)
self.assert_node_properties_updated()
self.setup_actor.arvados_node.get(self.TIMEOUT))
assert(finished.wait(self.TIMEOUT))
self.assert_node_properties_updated()
- self.assertEqual(2, self.api_client.nodes().update().execute.call_count)
+ self.api_client.nodes().create.called_with(body={}, assign_slot=True)
+ self.assertEqual(3, self.api_client.nodes().update().execute.call_count)
self.assertEqual(self.cloud_client.create_node(),
self.setup_actor.cloud_node.get(self.TIMEOUT))
start_time = time.time()
monitor_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_node, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
self.arvados_node)
self.shutdown_actor = self.ACTOR_CLASS.start(
self.timer, self.cloud_client, self.arvados_client, monitor_actor,
start_time = time.time()
self.node_actor = dispatch.ComputeNodeMonitorActor.start(
self.cloud_mock, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
+ self.timer, self.updates, self.cloud_client,
arv_node, boot_fail_after=300).proxy()
self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
self.assertEqual(testutil.ip_address_mock(4),
current_arvados['ip_address'])
- def test_update_arvados_node_syncs_when_fqdn_mismatch(self):
+ def test_update_arvados_node_calls_sync_node(self):
self.make_mocks(5)
self.cloud_mock.extra['testname'] = 'cloudfqdn.zzzzz.arvadosapi.com'
self.make_actor()
arv_node = testutil.arvados_node_mock(5)
self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
self.assertEqual(1, self.updates.sync_node.call_count)
-
- def test_update_arvados_node_skips_sync_when_fqdn_match(self):
- self.make_mocks(6)
- arv_node = testutil.arvados_node_mock(6)
- self.cloud_mock.extra['testname'] ='{n[hostname]}.{n[domain]}'.format(
- n=arv_node)
- self.make_actor()
- self.node_actor.update_arvados_node(arv_node).get(self.TIMEOUT)
- self.assertEqual(0, self.updates.sync_node.call_count)
import arvnodeman.computenode.dispatch.slurm as slurm_dispatch
from . import testutil
-from .test_computenode_dispatch import ComputeNodeShutdownActorMixin, ComputeNodeUpdateActorTestCase
+from .test_computenode_dispatch import \
+ ComputeNodeShutdownActorMixin, \
+ ComputeNodeSetupActorTestCase, \
+ ComputeNodeUpdateActorTestCase
@mock.patch('subprocess.check_output')
class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
cloud_node = testutil.cloud_node_mock()
arv_node = testutil.arvados_node_mock()
self.updater.sync_node(cloud_node, arv_node).get(self.TIMEOUT)
- check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000'])
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=99000', 'Features=instancetype=z99.test'])
+
+class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
+ ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
+
+ @mock.patch('subprocess.check_output')
+ def test_update_node_features(self, check_output):
+ # `scontrol update` happens only if the Arvados node record
+ # has a hostname. ComputeNodeSetupActorTestCase.make_mocks
+ # uses mocks with scrubbed hostnames, so we override with the
+ # default testutil.arvados_node_mock.
+ self.make_mocks(arvados_effect=[testutil.arvados_node_mock()])
+ self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
# patches that up in listings.
size = testutil.MockSize(2)
node = testutil.cloud_node_mock(size=size)
- node.size = size.name
+ node.size = size.id
self.driver_mock().list_sizes.return_value = [size]
self.driver_mock().list_nodes.return_value = [node]
driver = self.new_driver()
unittest.TestCase):
def busywait(self, f):
- n = 0
- while not f() and n < 200:
+ for n in xrange(200):
+ ok = f()
+ if ok:
+ return
time.sleep(.1)
self.daemon.ping().get(self.TIMEOUT)
- n += 1
- self.assertTrue(f())
+ self.assertTrue(ok) # always falsy, but not necessarily False
def mock_node_start(self, **kwargs):
# Make sure that every time the daemon starts a setup actor,
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
- mock_squeue.return_value = "1|1024|0|Resources|" + container_uuid + "\n"
+ mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-2|1024|0|Resources|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@mock.patch("subprocess.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
- mock_squeue.return_value = """1|1024M|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzy
-1|2G|0|ReqNodeNotAvail, UnavailableNod|zzzzz-zzzzz-zzzzzzzzzzzzzzz
+ mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
+ @mock.patch("subprocess.check_output")
+ def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+ super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
+ [(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
+ True, True)
+ self.monitor.subscribe(self.subscriber).get(self.TIMEOUT)
+ self.stop_proxy(self.monitor)
+ self.subscriber.assert_called_with([testutil.MockSize(2)])
+
def test_coerce_to_mb(self):
self.assertEqual(1, jobqueue.JobQueueMonitorActor.coerce_to_mb("1"))
self.assertEqual(512, jobqueue.JobQueueMonitorActor.coerce_to_mb("512"))
@mock.patch("subprocess.check_output")
def test_update_from_sinfo(self, sinfo_mock):
- sinfo_mock.return_value = "compute99 alloc"
- node = testutil.arvados_node_mock()
+ sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
+compute2|alloc|(null)
+notarvados12345|idle|(null)
+"""
+ nodeIdle = testutil.arvados_node_mock(node_num=1)
+ nodeBusy = testutil.arvados_node_mock(node_num=2)
+ nodeMissing = testutil.arvados_node_mock(node_num=99)
self.build_monitor([{
- 'items': [node],
+ 'items': [nodeIdle, nodeBusy, nodeMissing],
'items_available': 1,
'offset': 0
}, {
'items_available': 1,
'offset': 1
}])
- self.monitor.subscribe_to(node['uuid'],
+ self.monitor.subscribe_to(nodeMissing['uuid'],
self.subscriber).get(self.TIMEOUT)
self.stop_proxy(self.monitor)
- self.subscriber.assert_called_with(node)
- self.assertEqual("busy", node["crunch_worker_state"])
+ self.subscriber.assert_called_with(nodeMissing)
+
+ self.assertEqual("idle", nodeIdle["crunch_worker_state"])
+ self.assertEqual("busy", nodeBusy["crunch_worker_state"])
+ self.assertEqual("down", nodeMissing["crunch_worker_state"])
+
+ self.assertEqual("instancetype=a1.test", nodeIdle["slurm_node_features"])
+ self.assertEqual("", nodeBusy["slurm_node_features"])
+ self.assertEqual("", nodeMissing["slurm_node_features"])
class CloudNodeListMonitorActorTestCase(testutil.RemotePollLoopActorTestMixin,
def cloud_node_fqdn(node):
# We intentionally put the FQDN somewhere goofy to make sure tested code is
# using this function for lookups.
- return node.extra.get('testname', 'NoTestName')
+ return node.extra.get('testname', node.name+'.NoTestName.invalid')
def ip_address_mock(last_octet):
return '10.20.30.{}'.format(last_octet)
class MockSize(object):
def __init__(self, factor):
self.id = 'z{}.test'.format(factor)
- self.name = self.id
+ self.name = 'test size '+self.id
self.ram = 128 * factor
self.disk = factor # GB
self.scratch = 1000 * factor # MB