add Real Time Genomics pipeline template
[arvados.git] / services / api / script / crunch-dispatch.rb
1 #!/usr/bin/env ruby
2
3 include Process
4
5 $signal = {}
6 %w{TERM INT}.each do |sig|
7   signame = sig
8   Signal.trap(sig) do
9     $stderr.puts "Received #{signame} signal"
10     $signal[:term] = true
11   end
12 end
13
14 if ENV["CRUNCH_DISPATCH_LOCKFILE"]
15   lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE"
16   lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644)
17   unless lockfile.flock File::LOCK_EX|File::LOCK_NB
18     abort "Lock unavailable on #{lockfilename} - exit"
19   end
20 end
21
22 ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
23
24 require File.dirname(__FILE__) + '/../config/boot'
25 require File.dirname(__FILE__) + '/../config/environment'
26 require 'open3'
27
28 $redis ||= Redis.new
29
30 class Dispatcher
31   include ApplicationHelper
32
33   def sysuser
34     return act_as_system_user
35   end
36
37   def refresh_todo
38     @todo = Job.queue
39   end
40
41   def start_jobs
42     if Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
43       @idle_slurm_nodes = 0
44       begin
45         `sinfo`.
46           split("\n").
47           collect { |line| line.match /(\d+) +idle/ }.
48           each do |re|
49           @idle_slurm_nodes = re[1].to_i if re
50         end
51       rescue
52       end
53     end
54
55     @todo.each do |job|
56
57       min_nodes = 1
58       begin
59         if job.resource_limits['min_nodes']
60           min_nodes = begin job.resource_limits['min_nodes'].to_i rescue 1 end
61         end
62       end
63       next if @idle_slurm_nodes and @idle_slurm_nodes < min_nodes
64
65       next if @running[job.uuid]
66       next if !take(job)
67
68       cmd_args = nil
69       case Server::Application.config.crunch_job_wrapper
70       when :none
71         cmd_args = []
72       when :slurm_immediate
73         cmd_args = ["salloc",
74                     "--chdir=/",
75                     "--immediate",
76                     "--exclusive",
77                     "--no-kill",
78                     "--job-name=#{job.uuid}",
79                     "--nodes=#{min_nodes}"]
80       else
81         raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
82       end
83
84       if Server::Application.config.crunch_job_user
85         cmd_args.unshift("sudo", "-E", "-u",
86                          Server::Application.config.crunch_job_user,
87                          "PERLLIB=#{ENV['PERLLIB']}")
88       end
89
90       job_auth = ApiClientAuthorization.
91         new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
92             api_client_id: 0)
93       job_auth.save
94
95       cmd_args << (ENV['CRUNCH_JOB_BIN'] || `which crunch-job`.strip)
96       cmd_args << '--job-api-token'
97       cmd_args << job_auth.api_token
98       cmd_args << '--job'
99       cmd_args << job.uuid
100
101       commit = Commit.where(sha1: job.script_version).first
102       if commit
103         cmd_args << '--git-dir'
104         if File.exists?(File.
105                         join(Rails.configuration.git_repositories_dir,
106                              commit.repository_name + '.git'))
107           cmd_args << File.
108             join(Rails.configuration.git_repositories_dir,
109                  commit.repository_name + '.git')
110         else
111           cmd_args << File.
112             join(Rails.configuration.git_repositories_dir,
113                  commit.repository_name, '.git')
114         end
115       end
116
117       $stderr.puts "dispatch: #{cmd_args.join ' '}"
118
119       begin
120         i, o, e, t = Open3.popen3(*cmd_args)
121       rescue
122         $stderr.puts "dispatch: popen3: #{$!}"
123         sleep 1
124         untake(job)
125         next
126       end
127       $stderr.puts "dispatch: job #{job.uuid} start"
128       $stderr.puts "dispatch: child #{t.pid} start"
129       @running[job.uuid] = {
130         stdin: i,
131         stdout: o,
132         stderr: e,
133         wait_thr: t,
134         job: job,
135         stderr_buf: '',
136         started: false,
137         sent_int: 0,
138         job_auth: job_auth
139       }
140       i.close
141     end
142   end
143
144   def take(job)
145     # no-op -- let crunch-job take care of locking.
146     true
147   end
148
149   def untake(job)
150     # no-op -- let crunch-job take care of locking.
151     true
152   end
153
154   def read_pipes
155     @running.each do |job_uuid, j|
156       job = j[:job]
157
158       # Throw away child stdout
159       begin
160         j[:stdout].read_nonblock(2**20)
161       rescue Errno::EAGAIN, EOFError
162       end
163
164       # Read whatever is available from child stderr
165       stderr_buf = false
166       begin
167         stderr_buf = j[:stderr].read_nonblock(2**20)
168       rescue Errno::EAGAIN, EOFError
169       end
170
171       if stderr_buf
172         j[:stderr_buf] << stderr_buf
173         if j[:stderr_buf].index "\n"
174           lines = j[:stderr_buf].lines("\n").to_a
175           if j[:stderr_buf][-1] == "\n"
176             j[:stderr_buf] = ''
177           else
178             j[:stderr_buf] = lines.pop
179           end
180           lines.each do |line|
181             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
182             $stderr.puts line
183             $redis.publish job_uuid, "#{Time.now.ctime.to_s} #{line.strip}"
184           end
185         end
186       end
187     end
188   end
189
190   def reap_children
191     return if 0 == @running.size
192     pid_done = nil
193     j_done = nil
194
195     if false
196       begin
197         pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED)
198         if pid_done
199           j_done = @running.values.
200             select { |j| j[:wait_thr].pid == pid_done }.
201             first
202         end
203       rescue SystemCallError
204         # I have @running processes but system reports I have no
205         # children. This is likely to happen repeatedly if it happens at
206         # all; I will log this no more than once per child process I
207         # start.
208         if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size
209           children = @running.values.collect { |j| j[:wait_thr].pid }.join ' '
210           $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}"
211         end
212         @running.each do |uuid,j| j[:warned_waitpid_error] = true end
213       end
214     else
215       @running.each do |uuid, j|
216         if j[:wait_thr].status == false
217           pid_done = j[:wait_thr].pid
218           j_done = j
219         end
220       end
221     end
222
223     return if !pid_done
224
225     job_done = j_done[:job]
226     $stderr.puts "dispatch: child #{pid_done} exit"
227     $stderr.puts "dispatch: job #{job_done.uuid} end"
228     $redis.publish job_done.uuid, "end"
229
230     # Ensure every last drop of stdout and stderr is consumed
231     read_pipes
232     if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
233       $stderr.puts j_done[:stderr_buf] + "\n"
234     end
235
236     # Wait the thread
237     j_done[:wait_thr].value
238
239     # Invalidate the per-job auth token
240     j_done[:job_auth].update_attributes expires_at: Time.now
241
242     @running.delete job_done.uuid
243   end
244
245   def run
246     act_as_system_user
247     @running ||= {}
248     $stderr.puts "dispatch: ready"
249     while !$signal[:term] or @running.size > 0
250       read_pipes
251       if $signal[:term]
252         @running.each do |uuid, j|
253           if !j[:started] and j[:sent_int] < 2
254             begin
255               Process.kill 'INT', j[:wait_thr].pid
256             rescue Errno::ESRCH
257               # No such pid = race condition + desired result is
258               # already achieved
259             end
260             j[:sent_int] += 1
261           end
262         end
263       else
264         refresh_todo unless did_recently(:refresh_todo, 1.0)
265         start_jobs unless @todo.empty? or did_recently(:start_jobs, 1.0)
266       end
267       reap_children
268       select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
269              [], [], 1)
270     end
271   end
272
273   protected
274
275   def did_recently(thing, min_interval)
276     @did_recently ||= {}
277     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
278       @did_recently[thing] = Time.now
279       false
280     else
281       true
282     end
283   end
284 end
285
286 Dispatcher.new.run