Merge branch '12573-crunch2-slurm-priority' closes #12573
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 4 Dec 2017 21:32:04 +0000 (16:32 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Mon, 4 Dec 2017 21:32:11 +0000 (16:32 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

doc/api/methods/container_requests.html.textile.liquid
doc/api/methods/containers.html.textile.liquid
services/api/app/models/container.rb
services/api/app/models/container_request.rb
services/api/test/fixtures/containers.yml
services/api/test/unit/container_request_test.rb
services/api/test/unit/container_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
services/crunch-dispatch-slurm/squeue.go

index 7491fecc37614de1fe7ae3c70133654803abea70..1c2550f723f5d8d96241ff12b9d5c09cf136e512 100644 (file)
@@ -50,13 +50,27 @@ table(table table-bordered table-condensed).
 |output_path|string|Path to a directory or file inside the container that should be preserved as container's output when it finishes. This path must be one of the mount targets. For best performance, point output_path to a writable collection mount.  See "Pre-populate output using Mount points":#pre-populate-output for details regarding optional output pre-population using mount points and "Symlinks in output":#symlinks-in-output for additional details.|Required.|
 |output_name|string|Desired name for the output collection. If null, a name will be assigned automatically.||
 |output_ttl|integer|Desired lifetime for the output collection, in seconds. If zero, the output collection will not be deleted automatically.||
-|priority|integer|Higher value means spend more resources on this container_request, i.e., go ahead of other queued containers, bring up more nodes etc.|Priority 0 means a container should not be run on behalf of this request. Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".|
+|priority|integer|Range 0-1000.  Indicate scheduling order preference.|Clients are expected to submit container requests with zero priority in order to preview the container that will be used to satisfy it. Priority can be null if and only if state!="Committed".  See "below for more details":#priority .|
 |expires_at|datetime|After this time, priority is considered to be zero.|Not yet implemented.|
 |use_existing|boolean|If possible, use an existing (non-failed) container to satisfy the request instead of creating a new one.|Default is true|
 |log_uuid|string|Log collection containing log messages provided by the scheduler and crunch processes.|Null if the container has not yet completed.|
 |output_uuid|string|Output collection created when the container finished successfully.|Null if the container has failed or not yet completed.|
 |filters|string|Additional constraints for satisfying the container_request, given in the same form as the filters parameter accepted by the container_requests.list API.|
 
+h2(#priority). Priority
+
+The @priority@ field has a range of 0-1000.
+
+Priority 0 means no container should run on behalf of this request, and containers already running will be terminated (setting container priority to 0 is the cancel operation.)
+
+Priority 1 is the lowest priority.
+
+Priority 1000 is the highest priority.
+
+The actual order that containers execute is determined by the underlying scheduling software (e.g. SLURM) and may be based on a combination of container priority, submission time, available resources, and other factors.
+
+In the current implementation, the magnitude of difference in priority between two containers affects the weight of priority vs age in determining scheduling order.  If two containers have only a small difference in priority (for example, 500 and 501) and the lower priority container has a longer queue time, the lower priority container may be scheduled before the higher priority container.  Use a greater magnitude difference (for example, 500 and 600) to give higher weight to priority over queue time.
+
 h2(#mount_types). {% include 'mount_types' %}
 
 h2(#runtime_constraints). {% include 'container_runtime_constraints' %}
index a8dd8abdc6d8f4d5b007461e4e71e71b95d26c49..30ec055a6bbb61fb1e0c2051f50b4c4f41b8a836 100644 (file)
@@ -47,7 +47,7 @@ Generally this will contain additional keys that are not present in any correspo
 |output|string|Portable data hash of the output collection.|Null if the container is not yet finished.|
 |container_image|string|Portable data hash of a collection containing the docker image used to run the container.||
 |progress|number|A number between 0.0 and 1.0 describing the fraction of work done.||
-|priority|integer|Priority assigned by the system, taking into account the priorities of all associated ContainerRequests.||
+|priority|integer|Range 0-1000.  Indicate scheduling order preference.|Currently assigned by the system as the max() of the priorities of all associated ContainerRequests.  See "container request priority":container_requests.html#priority .|
 |exit_code|integer|Process exit code.|Null if state!="Complete"|
 |auth_uuid|string|UUID of a token to be passed into the container itself, used to access Keep-backed mounts, etc.|Null if state∉{"Locked","Running"}|
 |locked_by_uuid|string|UUID of a token, indicating which dispatch process changed state to Locked. If null, any token can be used to lock. If not null, only the indicated token can modify this container.|Null if state∉{"Locked","Running"}|
index 466adc0eefb573a08b18dff2d5280102ef448210..b3739da9cfde9bc00a8a0e6e135b914398814e30 100644 (file)
@@ -24,6 +24,7 @@ class Container < ArvadosModel
   before_validation :fill_field_defaults, :if => :new_record?
   before_validation :set_timestamps
   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
+  validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 }
   validate :validate_state_change
   validate :validate_change
   validate :validate_lock
@@ -333,7 +334,7 @@ class Container < ArvadosModel
     self.runtime_constraints ||= {}
     self.mounts ||= {}
     self.cwd ||= "."
-    self.priority ||= 1
+    self.priority ||= 0
     self.scheduling_parameters ||= {}
   end
 
index 94e4e1f9ddd4289bc8bbe23c0f61d54d06213a18..25becb24a62682fa6e0ae9fe115099e7e22fceba 100644 (file)
@@ -23,6 +23,7 @@ class ContainerRequest < ArvadosModel
   before_validation :set_container
   validates :command, :container_image, :output_path, :cwd, :presence => true
   validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
+  validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 }
   validate :validate_state_change
   validate :check_update_whitelist
   after_save :update_priority
@@ -158,6 +159,7 @@ class ContainerRequest < ArvadosModel
     self.container_count_max ||= Rails.configuration.container_count_max
     self.scheduling_parameters ||= {}
     self.output_ttl ||= 0
+    self.priority ||= 0
   end
 
   def set_container
@@ -272,7 +274,10 @@ class ContainerRequest < ArvadosModel
 
     token_uuid = current_api_client_authorization.andand.uuid
     container = Container.where('auth_uuid=?', token_uuid).order('created_at desc').first
-    self.requesting_container_uuid = container.uuid if container
+    if container
+      self.requesting_container_uuid = container.uuid
+      self.priority = container.priority
+    end
     true
   end
 end
index a7ad6f0378d90679cb1ac183f69233c34fddc88a..7e2de6d8ca6207bc45738d4513507cae874dae58 100644 (file)
@@ -28,7 +28,7 @@ running:
   uuid: zzzzz-dz642-runningcontainr
   owner_uuid: zzzzz-tpzed-000000000000000
   state: Running
-  priority: 1
+  priority: 12
   created_at: <%= 1.minute.ago.to_s(:db) %>
   updated_at: <%= 1.minute.ago.to_s(:db) %>
   started_at: <%= 1.minute.ago.to_s(:db) %>
index cecf7b818e7004f74fb4544033d1a6e636e55069..0edc0f45938cc07924150c365a77585ade65fc55 100644 (file)
@@ -41,7 +41,7 @@ class ContainerRequestTest < ActiveSupport::TestCase
     cr = create_minimal_req!
 
     assert_nil cr.container_uuid
-    assert_nil cr.priority
+    assert_equal 0, cr.priority
 
     check_bogus_states cr
 
@@ -108,7 +108,8 @@ class ContainerRequestTest < ActiveSupport::TestCase
 
   test "Container request priority must be non-nil" do
     set_user_from_auth :active
-    cr = create_minimal_req!(priority: nil)
+    cr = create_minimal_req!
+    cr.priority = nil
     cr.state = "Committed"
     assert_raises(ActiveRecord::RecordInvalid) do
       cr.save!
@@ -323,14 +324,15 @@ class ContainerRequestTest < ActiveSupport::TestCase
   end
 
   [
-    ['running_container_auth', 'zzzzz-dz642-runningcontainr'],
-    ['active_no_prefs', nil],
-  ].each do |token, expected|
+    ['running_container_auth', 'zzzzz-dz642-runningcontainr', 12],
+    ['active_no_prefs', nil, 0],
+  ].each do |token, expected, expected_priority|
     test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
       set_user_from_auth token
       cr = ContainerRequest.create(container_image: "img", output_path: "/tmp", command: ["echo", "foo"])
       assert_not_nil cr.uuid, 'uuid should be set for newly created container_request'
       assert_equal expected, cr.requesting_container_uuid
+      assert_equal expected_priority, cr.priority
     end
   end
 
@@ -803,4 +805,35 @@ class ContainerRequestTest < ActiveSupport::TestCase
       assert_nothing_raised {cr.destroy}
     end
   end
+
+  test "Container request valid priority" do
+    set_user_from_auth :active
+    cr = create_minimal_req!
+
+    assert_raises(ActiveRecord::RecordInvalid) do
+      cr.priority = -1
+      cr.save!
+    end
+
+    cr.priority = 0
+    cr.save!
+
+    cr.priority = 1
+    cr.save!
+
+    cr.priority = 500
+    cr.save!
+
+    cr.priority = 999
+    cr.save!
+
+    cr.priority = 1000
+    cr.save!
+
+    assert_raises(ActiveRecord::RecordInvalid) do
+      cr.priority = 1001
+      cr.save!
+    end
+  end
+
 end
index 09373fdc05588ea593a0ff33906484d47be082ef..eb4f35fea3d293c922d07fc6d16981f81b5092ee 100644 (file)
@@ -95,6 +95,42 @@ class ContainerTest < ActiveSupport::TestCase
     end
   end
 
+  test "Container valid priority" do
+    act_as_system_user do
+      c, _ = minimal_new(environment: {},
+                      mounts: {"BAR" => "FOO"},
+                      output_path: "/tmp",
+                      priority: 1,
+                      runtime_constraints: {"vcpus" => 1, "ram" => 1})
+
+      assert_raises(ActiveRecord::RecordInvalid) do
+        c.priority = -1
+        c.save!
+      end
+
+      c.priority = 0
+      c.save!
+
+      c.priority = 1
+      c.save!
+
+      c.priority = 500
+      c.save!
+
+      c.priority = 999
+      c.save!
+
+      c.priority = 1000
+      c.save!
+
+      assert_raises(ActiveRecord::RecordInvalid) do
+        c.priority = 1001
+        c.save!
+      end
+    end
+  end
+
+
   test "Container serialized hash attributes sorted before save" do
     env = {"C" => 3, "B" => 2, "A" => 1}
     m = {"F" => {"kind" => 3}, "E" => {"kind" => 2}, "D" => {"kind" => 1}}
index d322b0f3f6dcf53735df8e920f07d0329397bbbf..3d094c7f6e3a68f745f02be1950cc76eb8fdecd2 100644 (file)
@@ -164,6 +164,17 @@ func checkSqueueForOrphans(dispatcher *dispatch.Dispatcher, sqCheck *SqueueCheck
        }
 }
 
+func niceness(priority int) int {
+       if priority > 1000 {
+               priority = 1000
+       }
+       if priority < 0 {
+               priority = 0
+       }
+       // Niceness range 1-10000
+       return (1000 - priority) * 10
+}
+
 // sbatchCmd
 func sbatchFunc(container arvados.Container) *exec.Cmd {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
@@ -182,6 +193,7 @@ func sbatchFunc(container arvados.Container) *exec.Cmd {
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--mem=%d", mem))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs))
        sbatchArgs = append(sbatchArgs, fmt.Sprintf("--tmp=%d", disk))
+       sbatchArgs = append(sbatchArgs, fmt.Sprintf("--nice=%d", niceness(container.Priority)))
        if len(container.SchedulingParameters.Partitions) > 0 {
                sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
        }
@@ -194,9 +206,15 @@ func scancelFunc(container arvados.Container) *exec.Cmd {
        return exec.Command("scancel", "--name="+container.UUID)
 }
 
+// scontrolCmd
+func scontrolFunc(container arvados.Container) *exec.Cmd {
+       return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+}
+
 // Wrap these so that they can be overridden by tests
 var sbatchCmd = sbatchFunc
 var scancelCmd = scancelFunc
+var scontrolCmd = scontrolFunc
 
 // Submit job to slurm using sbatch.
 func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
@@ -290,6 +308,10 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados
                        } else if updated.Priority == 0 {
                                log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
                                scancel(ctr)
+                       } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
+                               // dynamically adjust priority
+                               log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
+                               scontrolUpdate(updated)
                        }
                }
        }
@@ -310,6 +332,21 @@ func scancel(ctr arvados.Container) {
        }
 }
 
+func scontrolUpdate(ctr arvados.Container) {
+       sqCheck.L.Lock()
+       cmd := scontrolCmd(ctr)
+       msg, err := cmd.CombinedOutput()
+       sqCheck.L.Unlock()
+
+       if err != nil {
+               log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+               time.Sleep(time.Second)
+       } else if sqCheck.HasUUID(ctr.UUID) {
+               log.Printf("Container %s priority is now %v, niceness is now %v",
+                       ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
+       }
+}
+
 func readConfig(dst interface{}, path string) error {
        err := config.LoadFile(dst, path)
        if err != nil && os.IsNotExist(err) && path == defaultConfigPath {
index 5879f84d8385ece8176f3e0b14c1a68a159c0c87..a823755379574155420e465b2ec2b72f234a3024 100644 (file)
@@ -64,6 +64,101 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) {
        arvadostest.ResetEnv()
 }
 
+func (s *TestSuite) integrationTest(c *C,
+       newSqueueCmd func() *exec.Cmd,
+       newScancelCmd func(arvados.Container) *exec.Cmd,
+       newSbatchCmd func(arvados.Container) *exec.Cmd,
+       newScontrolCmd func(arvados.Container) *exec.Cmd,
+       sbatchCmdComps []string,
+       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+       arvadostest.ResetEnv()
+
+       arv, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, IsNil)
+
+       var sbatchCmdLine []string
+
+       // Override sbatchCmd
+       defer func(orig func(arvados.Container) *exec.Cmd) {
+               sbatchCmd = orig
+       }(sbatchCmd)
+
+       if newSbatchCmd != nil {
+               sbatchCmd = newSbatchCmd
+       } else {
+               sbatchCmd = func(container arvados.Container) *exec.Cmd {
+                       sbatchCmdLine = sbatchFunc(container).Args
+                       return exec.Command("sh")
+               }
+       }
+
+       // Override squeueCmd
+       defer func(orig func() *exec.Cmd) {
+               squeueCmd = orig
+       }(squeueCmd)
+       squeueCmd = newSqueueCmd
+
+       // Override scancel
+       defer func(orig func(arvados.Container) *exec.Cmd) {
+               scancelCmd = orig
+       }(scancelCmd)
+       scancelCmd = newScancelCmd
+
+       // Override scontrol
+       defer func(orig func(arvados.Container) *exec.Cmd) {
+               scontrolCmd = orig
+       }(scontrolCmd)
+       scontrolCmd = newScontrolCmd
+
+       // There should be one queued container
+       params := arvadosclient.Dict{
+               "filters": [][]string{{"state", "=", "Queued"}},
+       }
+       var containers arvados.ContainerList
+       err = arv.List("containers", params, &containers)
+       c.Check(err, IsNil)
+       c.Check(len(containers.Items), Equals, 1)
+
+       theConfig.CrunchRunCommand = []string{"echo"}
+
+       ctx, cancel := context.WithCancel(context.Background())
+       doneRun := make(chan struct{})
+
+       dispatcher := dispatch.Dispatcher{
+               Arv:        arv,
+               PollPeriod: time.Duration(1) * time.Second,
+               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
+                       go func() {
+                               runContainer(disp, ctr)
+                               doneRun <- struct{}{}
+                       }()
+                       run(disp, ctr, status)
+                       cancel()
+               },
+       }
+
+       sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
+
+       err = dispatcher.Run(ctx)
+       <-doneRun
+       c.Assert(err, Equals, context.Canceled)
+
+       sqCheck.Stop()
+
+       c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+
+       // There should be no queued containers now
+       err = arv.List("containers", params, &containers)
+       c.Check(err, IsNil)
+       c.Check(len(containers.Items), Equals, 0)
+
+       // Previously "Queued" container should now be in "Complete" state
+       var container arvados.Container
+       err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
+       c.Check(err, IsNil)
+       return container
+}
+
 func (s *TestSuite) TestIntegrationNormal(c *C) {
        done := false
        container := s.integrationTest(c,
@@ -71,11 +166,12 @@ func (s *TestSuite) TestIntegrationNormal(c *C) {
                        if done {
                                return exec.Command("true")
                        } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
                        }
                },
                nil,
                nil,
+               nil,
                []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -96,7 +192,7 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
                        if cmd != nil && cmd.ProcessState != nil {
                                return exec.Command("true")
                        } else {
-                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer")
+                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
                        }
                },
                func(container arvados.Container) *exec.Cmd {
@@ -109,6 +205,7 @@ func (s *TestSuite) TestIntegrationCancel(c *C) {
                        }
                },
                nil,
+               nil,
                []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -127,11 +224,13 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
                func() *exec.Cmd { return exec.Command("echo") },
                nil,
                nil,
+               nil,
                []string{"sbatch",
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--mem=%d", 11445),
                        fmt.Sprintf("--cpus-per-task=%d", 4),
-                       fmt.Sprintf("--tmp=%d", 45777)},
+                       fmt.Sprintf("--tmp=%d", 45777),
+                       fmt.Sprintf("--nice=%d", 9990)},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        time.Sleep(3 * time.Second)
@@ -147,6 +246,7 @@ func (s *TestSuite) TestSbatchFail(c *C) {
                func(container arvados.Container) *exec.Cmd {
                        return exec.Command("false")
                },
+               nil,
                []string(nil),
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
@@ -165,94 +265,6 @@ func (s *TestSuite) TestSbatchFail(c *C) {
        c.Assert(len(ll.Items), Equals, 1)
 }
 
-func (s *TestSuite) integrationTest(c *C,
-       newSqueueCmd func() *exec.Cmd,
-       newScancelCmd func(arvados.Container) *exec.Cmd,
-       newSbatchCmd func(arvados.Container) *exec.Cmd,
-       sbatchCmdComps []string,
-       runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
-       arvadostest.ResetEnv()
-
-       arv, err := arvadosclient.MakeArvadosClient()
-       c.Assert(err, IsNil)
-
-       var sbatchCmdLine []string
-
-       // Override sbatchCmd
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               sbatchCmd = orig
-       }(sbatchCmd)
-
-       if newSbatchCmd != nil {
-               sbatchCmd = newSbatchCmd
-       } else {
-               sbatchCmd = func(container arvados.Container) *exec.Cmd {
-                       sbatchCmdLine = sbatchFunc(container).Args
-                       return exec.Command("sh")
-               }
-       }
-
-       // Override squeueCmd
-       defer func(orig func() *exec.Cmd) {
-               squeueCmd = orig
-       }(squeueCmd)
-       squeueCmd = newSqueueCmd
-
-       // Override scancel
-       defer func(orig func(arvados.Container) *exec.Cmd) {
-               scancelCmd = orig
-       }(scancelCmd)
-       scancelCmd = newScancelCmd
-
-       // There should be one queued container
-       params := arvadosclient.Dict{
-               "filters": [][]string{{"state", "=", "Queued"}},
-       }
-       var containers arvados.ContainerList
-       err = arv.List("containers", params, &containers)
-       c.Check(err, IsNil)
-       c.Check(len(containers.Items), Equals, 1)
-
-       theConfig.CrunchRunCommand = []string{"echo"}
-
-       ctx, cancel := context.WithCancel(context.Background())
-       doneRun := make(chan struct{})
-
-       dispatcher := dispatch.Dispatcher{
-               Arv:        arv,
-               PollPeriod: time.Duration(1) * time.Second,
-               RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
-                       go func() {
-                               runContainer(disp, ctr)
-                               doneRun <- struct{}{}
-                       }()
-                       run(disp, ctr, status)
-                       cancel()
-               },
-       }
-
-       sqCheck = &SqueueChecker{Period: 500 * time.Millisecond}
-
-       err = dispatcher.Run(ctx)
-       <-doneRun
-       c.Assert(err, Equals, context.Canceled)
-
-       sqCheck.Stop()
-
-       c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
-
-       // There should be no queued containers now
-       err = arv.List("containers", params, &containers)
-       c.Check(err, IsNil)
-       c.Check(len(containers.Items), Equals, 0)
-
-       // Previously "Queued" container should now be in "Complete" state
-       var container arvados.Container
-       err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
-       c.Check(err, IsNil)
-       return container
-}
-
 func (s *MockArvadosServerSuite) TestAPIErrorGettingContainers(c *C) {
        apiStubResponses := make(map[string]arvadostest.StubResponse)
        apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
@@ -377,25 +389,69 @@ func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) {
 func testSbatchFuncWithArgs(c *C, args []string) {
        theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
 
-       container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2}}
+       container := arvados.Container{
+               UUID:               "123",
+               RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
+               Priority:           1}
        sbatchCmd := sbatchFunc(container)
 
        var expected []string
        expected = append(expected, "sbatch")
        expected = append(expected, theConfig.SbatchArguments...)
-       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0")
+       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
 
        c.Check(sbatchCmd.Args, DeepEquals, expected)
 }
 
 func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
        theConfig.SbatchArguments = nil
-       container := arvados.Container{UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}}
+       container := arvados.Container{
+               UUID:                 "123",
+               RuntimeConstraints:   arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
+               SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
+               Priority:             1}
        sbatchCmd := sbatchFunc(container)
 
        var expected []string
        expected = append(expected, "sbatch")
-       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--partition=blurb,b2")
+       expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
 
        c.Check(sbatchCmd.Args, DeepEquals, expected)
 }
+
+func (s *TestSuite) TestIntegrationChangePriority(c *C) {
+       var scontrolCmdLine []string
+       step := 0
+
+       container := s.integrationTest(c,
+               func() *exec.Cmd {
+                       if step == 0 {
+                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
+                       } else if step == 1 {
+                               return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
+                       } else {
+                               return exec.Command("echo")
+                       }
+               },
+               func(arvados.Container) *exec.Cmd { return exec.Command("true") },
+               nil,
+               func(container arvados.Container) *exec.Cmd {
+                       scontrolCmdLine = scontrolFunc(container).Args
+                       step = 1
+                       return exec.Command("true")
+               },
+               []string(nil),
+               func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
+                       dispatcher.UpdateState(container.UUID, dispatch.Running)
+                       time.Sleep(1 * time.Second)
+                       dispatcher.Arv.Update("containers", container.UUID,
+                               arvadosclient.Dict{
+                                       "container": arvadosclient.Dict{"priority": 600}},
+                               nil)
+                       time.Sleep(1 * time.Second)
+                       step = 2
+                       dispatcher.UpdateState(container.UUID, dispatch.Complete)
+               })
+       c.Check(container.State, Equals, arvados.ContainerStateComplete)
+       c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+}
index d22e02597b75d5d5d7a0b74599cc4d142d002345..819c2d2510f992ab75c18249d8b46d6c13828d6d 100644 (file)
@@ -6,6 +6,7 @@ package main
 
 import (
        "bytes"
+       "fmt"
        "log"
        "os/exec"
        "strings"
@@ -13,18 +14,23 @@ import (
        "time"
 )
 
+type jobPriority struct {
+       niceness        int
+       currentPriority int
+}
+
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
 type SqueueChecker struct {
        Period    time.Duration
-       uuids     map[string]bool
+       uuids     map[string]jobPriority
        startOnce sync.Once
        done      chan struct{}
        sync.Cond
 }
 
 func squeueFunc() *exec.Cmd {
-       return exec.Command("squeue", "--all", "--format=%j")
+       return exec.Command("squeue", "--all", "--format=%j %y %Q")
 }
 
 var squeueCmd = squeueFunc
@@ -40,7 +46,23 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool {
 
        // block until next squeue broadcast signaling an update.
        sqc.Wait()
-       return sqc.uuids[uuid]
+       _, exists := sqc.uuids[uuid]
+       return exists
+}
+
+// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist.
+func (sqc *SqueueChecker) GetNiceness(uuid string) int {
+       sqc.startOnce.Do(sqc.start)
+
+       sqc.L.Lock()
+       defer sqc.L.Unlock()
+
+       n, exists := sqc.uuids[uuid]
+       if exists {
+               return n.niceness
+       } else {
+               return -1
+       }
 }
 
 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
@@ -70,10 +92,16 @@ func (sqc *SqueueChecker) check() {
                return
        }
 
-       uuids := strings.Split(stdout.String(), "\n")
-       sqc.uuids = make(map[string]bool, len(uuids))
-       for _, uuid := range uuids {
-               sqc.uuids[uuid] = true
+       lines := strings.Split(stdout.String(), "\n")
+       sqc.uuids = make(map[string]jobPriority, len(lines))
+       for _, line := range lines {
+               var uuid string
+               var nice int
+               var prio int
+               fmt.Sscan(line, &uuid, &nice, &prio)
+               if uuid != "" {
+                       sqc.uuids[uuid] = jobPriority{nice, prio}
+               }
        }
        sqc.Broadcast()
 }