port crunch dispatcher from whjobmanager to crunch-job
[arvados.git] / services / api / script / crunch-dispatch.rb
similarity index 56%
rename from services/api/script/dispatch_jobs.rb
rename to services/api/script/crunch-dispatch.rb
index f9377473033ed6087524ef7448f14d0da739c3bd..f749f22a77ceef47b6c85833ea9175a804ad565f 100755 (executable)
@@ -29,7 +29,7 @@ class Dispatcher
   end
 
   def start_jobs
-    if Server::Application.config.whjobmanager_wrapper.to_s.match /^slurm/
+    if Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
       @idle_slurm_nodes = 0
       begin
         `sinfo`.
@@ -56,7 +56,7 @@ class Dispatcher
       next if !take(job)
 
       cmd_args = nil
-      case Server::Application.config.whjobmanager_wrapper
+      case Server::Application.config.crunch_job_wrapper
       when :none
         cmd_args = []
       when :slurm_immediate
@@ -67,27 +67,27 @@ class Dispatcher
                     "--job-name=#{job.uuid}",
                     "--nodes=#{min_nodes}"]
       else
-        raise "Unknown whjobmanager_wrapper: #{Server::Application.config.whjobmanager_wrapper}"
+        raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
       end
 
-      cmd_args << 'whjobmanager'
-      cmd_args << "id=#{job.uuid}"
-      cmd_args << "mrfunction=#{job.script}"
-      job.script_parameters.each do |k,v|
-        k = k.to_s
-        if k == 'input'
-          k = 'inputkey'
-        else
-          k = k.upcase
-        end
-        cmd_args << "#{k}=#{v}"
-      end
-      cmd_args << "revision=#{job.script_version}"
-
-      begin
-        cmd_args << "stepspernode=#{job.resource_limits['max_tasks_per_node'].to_i}"
-      rescue
-        # OK if limit is not specified. OK to ignore if not integer.
+      job_auth = ApiClientAuthorization.
+        new(user: User.where('uuid=?', job.modified_by_user).first,
+            api_client_id: 0)
+      job_auth.save
+
+      cmd_args << 'crunch-job'
+      cmd_args << '--job-api-token'
+      cmd_args << job_auth.api_token
+      cmd_args << '--job'
+      cmd_args << job.uuid
+
+      commit = Commit.where(sha1: job.script_version).first
+      if commit
+        cmd_args << '--git-dir'
+        cmd_args << File.
+          join(Rails.configuration.git_repositories_dir,
+               commit.repository_name,
+               '.git')
       end
 
       $stderr.puts "dispatch: #{cmd_args.join ' '}"
@@ -110,27 +110,21 @@ class Dispatcher
         job: job,
         stderr_buf: '',
         started: false,
-        sent_int: 0
+        sent_int: 0,
+        job_auth: job_auth
       }
       i.close
     end
   end
 
   def take(job)
-    lock_ok = false
-    ActiveRecord::Base.transaction do
-      job.reload
-      if job.is_locked_by.nil? and
-          job.update_attributes(is_locked_by: sysuser.uuid)
-        lock_ok = true
-      end
-    end
-    lock_ok
+    # no-op -- let crunch-job take care of locking.
+    true
   end
 
   def untake(job)
-    job.reload
-    job.update_attributes is_locked_by: nil
+    # no-op -- let crunch-job take care of locking.
+    true
   end
 
   def read_pipes
@@ -153,7 +147,7 @@ class Dispatcher
       if stderr_buf
         j[:stderr_buf] << stderr_buf
         if j[:stderr_buf].index "\n"
-          lines = j[:stderr_buf].lines "\n"
+          lines = j[:stderr_buf].lines("\n").to_a
           if j[:stderr_buf][-1] == "\n"
             j[:stderr_buf] = ''
           else
@@ -162,58 +156,6 @@ class Dispatcher
           lines.each do |line|
             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
             $stderr.puts line
-            line.chomp!
-            if (re = line.match(/#{job_uuid} (\d+) (\S*) (.*)/))
-              ignorethis, whjmpid, taskid, message = re.to_a
-              if taskid == '' and message == 'start'
-                $stderr.puts "dispatch: noticed #{job_uuid} started"
-                j[:started] = true
-                ActiveRecord::Base.transaction do
-                  j[:job].reload
-                  j[:job].update_attributes running: true, started_at: Time.now
-                end
-              elsif taskid == '' and (re = message.match /^revision (\S+)$/)
-                $stderr.puts "dispatch: noticed #{job_uuid} version #{re[1]}"
-                ActiveRecord::Base.transaction do
-                  j[:job].reload
-                  j[:job].script_version = re[1]
-                  j[:job].save
-                end
-              elsif taskid == '' and (re = message.match /^outputkey (\S+)$/)
-                $stderr.puts "dispatch: noticed #{job_uuid} output #{re[1]}"
-                j[:output] = re[1]
-              elsif taskid == '' and (re = message.match /^meta key is (\S+)$/)
-                $stderr.puts "dispatch: noticed #{job_uuid} log #{re[1]}"
-                j[:log] = re[1]
-                ActiveRecord::Base.transaction do
-                  j[:job].reload
-                  j[:job].update_attributes log: j[:log]
-                end
-              elsif taskid.match(/^\d+/) and (re = message.match /^failure /)
-                $stderr.puts "dispatch: noticed #{job_uuid} task fail"
-                ActiveRecord::Base.transaction do
-                  j[:job].reload
-                  j[:job].tasks_summary ||= {}
-                  j[:job].tasks_summary[:failed] ||= 0
-                  j[:job].tasks_summary[:failed] += 1
-                  j[:job].save
-                end
-              elsif (re = message.match(/^status: (\d+) done, (\d+) running, (\d+) todo/))
-                $stderr.puts "dispatch: noticed #{job_uuid} #{message}"
-                ActiveRecord::Base.transaction do
-                  j[:job].reload
-                  j[:job].tasks_summary ||= {}
-                  j[:job].tasks_summary[:done] = re[1].to_i
-                  j[:job].tasks_summary[:running] = re[2].to_i
-                  j[:job].tasks_summary[:todo] = re[3].to_i
-                  j[:job].save
-                end
-                if re[2].to_i == 0 and re[3].to_i == 0
-                  $stderr.puts "dispatch: noticed #{job_uuid} succeeded"
-                  j[:success] = true
-                end
-              end
-            end
           end
         end
       end
@@ -265,23 +207,12 @@ class Dispatcher
       $stderr.puts j_done[:stderr_buf] + "\n"
     end
 
-    j_done[:wait_thr].value          # wait the thread
+    # Wait the thread
+    j_done[:wait_thr].value
+
+    # Invalidate the per-job auth token
+    j_done[:job_auth].update_attributes expires_at: Time.now
 
-    if !j_done[:started]
-      # If the job never really started (due to a scheduling
-      # failure), just put it back in the queue
-      untake(job_done)
-      $stderr.puts "dispatch: job #{job_done.uuid} requeued"
-    else
-      # Otherwise, mark the job as finished
-      ActiveRecord::Base.transaction do
-        job_done.reload
-        job_done.log = j_done[:log]
-        job_done.output = j_done[:output]
-        job_done.success = j_done[:success]
-        job_done.assert_finished
-      end
-    end
     @running.delete job_done.uuid
   end