add script/dispatch_jobs.rb and related Job features
authorTom Clegg <tom@clinicalfuture.com>
Sat, 16 Mar 2013 21:39:49 +0000 (14:39 -0700)
committerTom Clegg <tom@clinicalfuture.com>
Sat, 16 Mar 2013 21:39:49 +0000 (14:39 -0700)
app/models/job.rb
config/environments/development.rb.example
config/environments/production.rb
config/environments/test.rb
db/migrate/20130315155820_add_is_locked_by_to_jobs.rb [new file with mode: 0644]
db/migrate/20130315183626_add_log_to_jobs.rb [new file with mode: 0644]
db/migrate/20130315213205_add_tasks_summary_to_jobs.rb [new file with mode: 0644]
db/schema.rb
lib/assign_uuid.rb
script/dispatch_jobs.rb [new file with mode: 0755]

index 9d7b3a081e5a052aed798811c6f8c02cce519033..d0173ee4dcbb79a2d87dac5a9b8517bfade41019 100644 (file)
@@ -3,6 +3,7 @@ class Job < OrvosModel
   include KindAndEtag
   include CommonApiTemplate
   serialize :command_parameters, Hash
+  serialize :tasks_summary, Hash
   before_create :ensure_unique_submit_id
 
   class SubmitIdReused < StandardError
@@ -21,9 +22,18 @@ class Job < OrvosModel
     t.add :finished_at
     t.add :success
     t.add :running
+    t.add :is_locked_by
+    t.add :log
+    t.add :tasks_summary
     t.add :dependencies
   end
 
+  def assert_finished
+    update_attributes(finished_at: finished_at || Time.now,
+                      success: success.nil? ? false : success,
+                      running: false)
+  end
+
   protected
 
   def ensure_unique_submit_id
@@ -45,4 +55,42 @@ class Job < OrvosModel
     end
     deps.keys
   end
+
+  def permission_to_update
+    if is_locked_by_was and !(current_user and
+                              current_user.uuid == is_locked_by_was)
+      if command_changed? or
+          command_parameters_changed? or
+          command_version_changed? or
+          cancelled_by_client_changed? or
+          cancelled_by_user_changed? or
+          cancelled_at_changed? or
+          started_at_changed? or
+          finished_at_changed? or
+          running_changed? or
+          success_changed? or
+          output_changed? or
+          log_changed? or
+          tasks_summary_changed?
+        logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}"
+        return false
+      end
+    end
+    if !is_locked_by_changed?
+      super
+    else
+      if !current_user
+        logger.warn "Anonymous user tried to change lock on #{self.class.to_s} #{uuid_was}"
+        false
+      elsif is_locked_by_was and is_locked_by_was != current_user.uuid
+        logger.warn "User #{current_user.uuid} tried to steal lock on #{self.class.to_s} #{uuid_was} from #{is_locked_by_was}"
+        false
+      elsif !is_locked_by.nil? and is_locked_by != current_user.uuid
+        logger.warn "User #{current_user.uuid} tried to lock #{self.class.to_s} #{uuid_was} with uuid #{is_locked_by}"
+        false
+      else
+        super
+      end
+    end
+  end
 end
index 44927a7dbbb994ea1f7b2244fd02d72f82462178..2d1ece5b082f005bc48ea21078690735f7dfd450 100644 (file)
@@ -30,6 +30,8 @@ Server::Application.configure do
 
   config.force_ssl = false
 
+  config.whjobmanager_wrapper = :none
+
   # config.dnsmasq_conf_dir = '/etc/dnsmasq.d'
 
   # config.compute_node_ami = 'ami-cbca41a2'
index 004c5ed66103d92662b0ba6ae48445ca0b580886..182c6a3aacf48ac30d5ac8728483723441d6397a 100644 (file)
@@ -58,6 +58,8 @@ Server::Application.configure do
   # Send deprecation notices to registered listeners
   config.active_support.deprecation = :notify
 
+  config.whjobmanager_wrapper = :slurm_immediate
+
   # config.dnsmasq_conf_dir = '/etc/dnsmasq.d'
 
   # config.compute_node_ami = 'ami-cbca41a2'
index b68f3d4247ec1c535ab3acdc1e2fd257363ea622..2fe43d1e02075a9b2725a2657480cce6199e8e1a 100644 (file)
@@ -37,6 +37,8 @@ Server::Application.configure do
   # Print deprecation notices to the stderr
   config.active_support.deprecation = :stderr
 
+  config.whjobmanager_wrapper = :slurm_immediate
+
   # config.dnsmasq_conf_dir = '/etc/dnsmasq.d'
 
   # config.compute_node_ami = 'ami-cbca41a2'
diff --git a/db/migrate/20130315155820_add_is_locked_by_to_jobs.rb b/db/migrate/20130315155820_add_is_locked_by_to_jobs.rb
new file mode 100644 (file)
index 0000000..02e6f34
--- /dev/null
@@ -0,0 +1,5 @@
+class AddIsLockedByToJobs < ActiveRecord::Migration
+  def change
+    add_column :jobs, :is_locked_by, :string
+  end
+end
diff --git a/db/migrate/20130315183626_add_log_to_jobs.rb b/db/migrate/20130315183626_add_log_to_jobs.rb
new file mode 100644 (file)
index 0000000..d9dbeff
--- /dev/null
@@ -0,0 +1,5 @@
+class AddLogToJobs < ActiveRecord::Migration
+  def change
+    add_column :jobs, :log, :string
+  end
+end
diff --git a/db/migrate/20130315213205_add_tasks_summary_to_jobs.rb b/db/migrate/20130315213205_add_tasks_summary_to_jobs.rb
new file mode 100644 (file)
index 0000000..ea30854
--- /dev/null
@@ -0,0 +1,5 @@
+class AddTasksSummaryToJobs < ActiveRecord::Migration
+  def change
+    add_column :jobs, :tasks_summary, :text
+  end
+end
index 715c793ad81c0b954cfaed95171799ffed23b401..c5720ea54c6f752900b19190aadcab2bb10cf377 100644 (file)
@@ -11,7 +11,7 @@
 #
 # It's strongly recommended to check this file into your version control system.
 
-ActiveRecord::Schema.define(:version => 20130313175417) do
+ActiveRecord::Schema.define(:version => 20130315213205) do
 
   create_table "api_client_authorizations", :force => true do |t|
     t.string   "api_token",               :null => false
@@ -128,6 +128,9 @@ ActiveRecord::Schema.define(:version => 20130313175417) do
     t.datetime "created_at"
     t.datetime "updated_at"
     t.string   "priority"
+    t.string   "is_locked_by"
+    t.string   "log"
+    t.text     "tasks_summary"
   end
 
   add_index "jobs", ["command"], :name => "index_jobs_on_command"
index ba761e12e770b2aa02df7700269d84b114d5d5e6..1c4ca81a7ec0f9446dbcaf03760159651ff7dac6 100644 (file)
@@ -19,6 +19,7 @@ module AssignUuid
 
   def assign_uuid
     return true if !self.respond_to_uuid?
+    return true if uuid and current_user and current_user.is_admin
     self.uuid = [Server::Application.config.uuid_prefix,
                  self.class.uuid_prefix,
                  rand(2**256).to_s(36)[-15..-1]].
diff --git a/script/dispatch_jobs.rb b/script/dispatch_jobs.rb
new file mode 100755 (executable)
index 0000000..246491d
--- /dev/null
@@ -0,0 +1,299 @@
+#!/usr/bin/env ruby
+
+include Process
+
+$signal = {}
+%w{TERM INT}.each do |sig|
+  signame = sig
+  Signal.trap(sig) do
+    $stderr.puts "Received #{signame} signal"
+    $signal[:term] = true
+  end
+end
+
+ENV["RAILS_ENV"] = ARGV[0] || "development"
+
+require File.dirname(__FILE__) + '/../config/boot'
+require File.dirname(__FILE__) + '/../config/environment'
+require 'open3'
+
+class Dispatcher
+
+  def sysuser
+    return @sysuser if @sysuser
+    Thread.current[:user] = User.new(is_admin: true)
+    sysuser_id = [Server::Application.config.uuid_prefix,
+                  User.uuid_prefix,
+                  '000000000000000'].join('-')
+    @sysuser = User.where('uuid=?', sysuser_id).first
+    if !@sysuser
+      @sysuser = User.new(uuid: sysuser_id,
+                          is_admin: true,
+                          email: 'root',
+                          first_name: 'root',
+                          last_name: '')
+      @sysuser.save!
+      @sysuser.reload
+    end
+    Thread.current[:user] = @sysuser
+
+    auth = ApiClientAuthorization.new(api_client_id: 0,
+                                      user_id: @sysuser.id)
+    auth.save!
+    auth_token = auth.api_token
+    $stderr.puts "dispatch: sysuser.uuid = #{@sysuser.uuid}"
+    $stderr.puts "dispatch: api_client_authorization.api_token = #{auth_token}"
+    @sysuser
+  end
+
+  def refresh_todo
+    @todo = Job.
+      where('started_at is ? and is_locked_by is ?', nil, nil).
+      order('priority desc, created_at')
+  end
+
+  def start_jobs
+    @todo.each do |job|
+
+      next if @running[job.uuid]
+      next if !take(job)
+
+      cmd_args = nil
+      case Server::Application.config.whjobmanager_wrapper
+      when :none
+        cmd_args = []
+      when :slurm_immediate
+        cmd_args = ["salloc",
+                    "--immediate",
+                    "--exclusive",
+                    "--no-kill",
+                    "--job-name=#{job.uuid}",
+                    "--nodes=1"]
+      else
+        raise "Unknown whjobmanager_wrapper: #{Server::Application.config.whjobmanager_wrapper}"
+      end
+
+      cmd_args << 'whjobmanager'
+      cmd_args << "id=#{job.uuid}"
+      cmd_args << "mrfunction=#{job.command}"
+      job.command_parameters.each do |k,v|
+        k = 'inputkey' if k == 'input'
+        cmd_args << "#{k}=#{v}"
+      end
+      cmd_args << "revision=#{job.command_version}"
+      begin
+        i, o, e, t = Open3.popen3(*cmd_args)
+      rescue
+        $stderr.puts "dispatch: popen3: #{$!}"
+        sleep 1
+        untake(job)
+        next
+      end
+      $stderr.puts "dispatch: job #{job.uuid} start"
+      $stderr.puts "dispatch: child #{t.pid} start"
+      @running[job.uuid] = {
+        stdin: i,
+        stdout: o,
+        stderr: e,
+        wait_thr: t,
+        job: job,
+        stderr_buf: '',
+        started: false,
+        sent_int: 0
+      }
+      i.close
+    end
+  end
+
+  def take(job)
+    lock_ok = false
+    ActiveRecord::Base.transaction do
+      job.reload
+      if job.is_locked_by.nil? and
+          job.update_attributes(is_locked_by: sysuser.uuid)
+        lock_ok = true
+      end
+    end
+    lock_ok
+  end
+
+  def untake(job)
+    job.reload
+    job.update_attributes is_locked_by: nil
+  end
+
+  def read_pipes
+    @running.each do |job_uuid, j|
+      job = j[:job]
+
+      # Throw away child stdout
+      begin
+        j[:stdout].read_nonblock(2**20)
+      rescue Errno::EAGAIN, EOFError
+      end
+
+      # Read whatever is available from child stderr
+      stderr_buf = false
+      begin
+        stderr_buf = j[:stderr].read_nonblock(2**20)
+      rescue Errno::EAGAIN, EOFError
+      end
+
+      if stderr_buf
+        j[:stderr_buf] << stderr_buf
+        if j[:stderr_buf].index "\n"
+          lines = j[:stderr_buf].lines "\n"
+          if j[:stderr_buf][-1] == "\n"
+            j[:stderr_buf] = ''
+          else
+            j[:stderr_buf] = lines.pop
+          end
+          lines.each do |line|
+            $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
+            $stderr.puts line
+            line.chomp!
+            if (re = line.match(/#{job_uuid} (\d+) (\S*) (.*)/))
+              ignorethis, whjmpid, taskid, message = re.to_a
+              if taskid == '' and message == 'start'
+                $stderr.puts "dispatch: noticed #{job_uuid} started"
+                j[:started] = true
+                ActiveRecord::Base.transaction do
+                  j[:job].reload
+                  j[:job].update_attributes running: true
+                end
+              elsif taskid == '' and message.match /^status: .* 0 todo/
+                $stderr.puts "dispatch: noticed #{job_uuid} succeeded"
+                j[:success] = true
+              elsif taskid == '' and (re = message.match /^outputkey (\S+)$/)
+                $stderr.puts "dispatch: noticed #{job_uuid} output #{re[1]}"
+                j[:output] = re[1]
+              elsif taskid == '' and (re = message.match /^meta key is (\S+)$/)
+                $stderr.puts "dispatch: noticed #{job_uuid} log #{re[1]}"
+                j[:log] = re[1]
+                ActiveRecord::Base.transaction do
+                  j[:job].reload
+                  j[:job].update_attributes log: j[:log]
+                end
+              elsif taskid.match(/^\d+/) and (re = message.match /^failure /)
+                $stderr.puts "dispatch: noticed #{job_uuid} task fail"
+                ActiveRecord::Base.transaction do
+                  j[:job].reload
+                  j[:job].tasks_summary ||= {}
+                  j[:job].tasks_summary[:failed] ||= 0
+                  j[:job].tasks_summary[:failed] += 1
+                  j[:job].save
+                end
+              elsif (re = message.match(/^status: (\d+) done, (\d+) running, (\d+) todo/))
+                $stderr.puts "dispatch: noticed #{job_uuid} #{message}"
+                ActiveRecord::Base.transaction do
+                  j[:job].reload
+                  j[:job].tasks_summary ||= {}
+                  j[:job].tasks_summary[:done] = re[1].to_i
+                  j[:job].tasks_summary[:running] = re[2].to_i
+                  j[:job].tasks_summary[:todo] = re[3].to_i
+                  j[:job].save
+                end
+              end
+            end
+          end
+        end
+      end
+    end
+  end
+
+  def reap_children
+    return if 0 == @running.size
+    pid_done = nil
+    j_done = nil
+
+    if false
+      begin
+        pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED)
+        if pid_done
+          j_done = @running.values.
+            select { |j| j[:wait_thr].pid == pid_done }.
+            first
+        end
+      rescue SystemCallError
+        # I have @running processes but system reports I have no
+        # children. This is likely to happen repeatedly if it happens at
+        # all; I will log this no more than once per child process I
+        # start.
+        if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size
+          children = @running.values.collect { |j| j[:wait_thr].pid }.join ' '
+          $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}"
+        end
+        @running.each do |uuid,j| j[:warned_waitpid_error] = true end
+      end
+    else
+      @running.each do |uuid, j|
+        if j[:wait_thr].status == false
+          pid_done = j[:wait_thr].pid
+          j_done = j
+        end
+      end
+    end
+
+    return if !pid_done
+
+    job_done = j_done[:job]
+    $stderr.puts "dispatch: child #{pid_done} exit"
+    $stderr.puts "dispatch: job #{job_done.uuid} end"
+
+    # Ensure every last drop of stdout and stderr is consumed
+    read_pipes
+    if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
+      $stderr.puts j_done[:stderr_buf] + "\n"
+    end
+
+    j_done[:wait_thr].value          # wait the thread
+
+    if !j_done[:started]
+      # If the job never really started (due to a scheduling
+      # failure), just put it back in the queue
+      untake(job_done)
+      $stderr.puts "dispatch: job #{job_done.uuid} requeued"
+    else
+      # Otherwise, mark the job as finished
+      ActiveRecord::Base.transaction do
+        job_done.reload
+        job_done.log = j_done[:log]
+        job_done.output = j_done[:output]
+        job_done.success = j_done[:success]
+        job_done.assert_finished
+      end
+    end
+    @running.delete job_done.uuid
+  end
+
+  def run
+    sysuser
+    @running ||= {}
+    $stderr.puts "dispatch: ready"
+    while !$signal[:term] or @running.size > 0
+      read_pipes
+      if $signal[:term]
+        @running.each do |uuid, j|
+          if !j[:started] and j[:sent_int] < 2
+            begin
+              Process.kill 'INT', j[:wait_thr].pid
+            rescue Errno::ESRCH
+              # No such pid = race condition + desired result is
+              # already achieved
+            end
+            j[:sent_int] += 1
+          end
+        end
+      else
+        refresh_todo
+        start_jobs
+      end
+      reap_children
+      select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
+             [], [], 1)
+    end
+  end
+  
+end
+
+Dispatcher.new.run