4573f2b5db22603247dd6c84b47cede38e14ec1d
[arvados.git] / services / api / script / crunch-dispatch.rb
1 #!/usr/bin/env ruby
2
3 include Process
4
5 $options = {}
6 (ARGV.any? ? ARGV : ['--jobs', '--pipelines']).each do |arg|
7   case arg
8   when '--jobs'
9     $options[:jobs] = true
10   when '--pipelines'
11     $options[:pipelines] = true
12   else
13     abort "Unrecognized command line option '#{arg}'"
14   end
15 end
16 if not ($options[:jobs] or $options[:pipelines])
17   abort "Nothing to do. Please specify at least one of: --jobs, --pipelines."
18 end
19
20 ARGV.reject! { |a| a =~ /--jobs|--pipelines/ }
21
22 $warned = {}
23 $signal = {}
24 %w{TERM INT}.each do |sig|
25   signame = sig
26   Signal.trap(sig) do
27     $stderr.puts "Received #{signame} signal"
28     $signal[:term] = true
29   end
30 end
31
32 if ENV["CRUNCH_DISPATCH_LOCKFILE"]
33   lockfilename = ENV.delete "CRUNCH_DISPATCH_LOCKFILE"
34   lockfile = File.open(lockfilename, File::RDWR|File::CREAT, 0644)
35   unless lockfile.flock File::LOCK_EX|File::LOCK_NB
36     abort "Lock unavailable on #{lockfilename} - exit"
37   end
38 end
39
40 ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
41
42 require File.dirname(__FILE__) + '/../config/boot'
43 require File.dirname(__FILE__) + '/../config/environment'
44 require 'open3'
45
46 class Dispatcher
47   include ApplicationHelper
48
49   def sysuser
50     return act_as_system_user
51   end
52
53   def refresh_todo
54     @todo = []
55     if $options[:jobs]
56       @todo = Job.queue.select(&:repository)
57     end
58     @todo_pipelines = []
59     if $options[:pipelines]
60       @todo_pipelines = PipelineInstance.queue
61     end
62   end
63
64   def slurm_status
65     slurm_nodes = {}
66     `sinfo --noheader -o %n:%t`.each_line do |sinfo_line|
67       hostname, state = sinfo_line.chomp.split(":", 2)
68       state.sub!(/\W+$/, "")
69       state = "down" unless %w(idle alloc down).include? state
70       slurm_nodes[hostname] = {state: state, job: nil}
71     end
72     `squeue --noheader -o %n:%j`.each_line do |squeue_line|
73       hostname, job_uuid = squeue_line.chomp.split(":", 2)
74       next unless slurm_nodes[hostname]
75       slurm_nodes[hostname][:job] = job_uuid
76     end
77     slurm_nodes
78   end
79
80   def update_node_status
81     return unless Server::Application.config.crunch_job_wrapper.to_s.match /^slurm/
82     @node_state ||= {}
83     slurm_status.each_pair do |hostname, slurmdata|
84       next if @node_state[hostname] == slurmdata
85       begin
86         node = Node.where('hostname=?', hostname).order(:last_ping_at).last
87         if node
88           $stderr.puts "dispatch: update #{hostname} state to #{slurmdata}"
89           node.info["slurm_state"] = slurmdata[:state]
90           node.job_uuid = slurmdata[:job]
91           if node.save
92             @node_state[hostname] = slurmdata
93           else
94             $stderr.puts "dispatch: failed to update #{node.uuid}: #{node.errors.messages}"
95           end
96         elsif slurmdata[:state] != 'down'
97           $stderr.puts "dispatch: SLURM reports '#{hostname}' is not down, but no node has that name"
98         end
99       rescue => error
100         $stderr.puts "dispatch: error updating #{hostname} node status: #{error}"
101       end
102     end
103   end
104
105   def positive_int(raw_value, default=nil)
106     value = begin raw_value.to_i rescue 0 end
107     if value > 0
108       value
109     else
110       default
111     end
112   end
113
114   NODE_CONSTRAINT_MAP = {
115     # Map Job runtime_constraints keys to the corresponding Node info key.
116     'min_ram_mb_per_node' => 'total_ram_mb',
117     'min_scratch_mb_per_node' => 'total_scratch_mb',
118     'min_cores_per_node' => 'total_cpu_cores',
119   }
120
121   def nodes_available_for_job_now(job)
122     # Find Nodes that satisfy a Job's runtime constraints (by building
123     # a list of Procs and using them to test each Node).  If there
124     # enough to run the Job, return an array of usable node objects.
125     # Otherwise, return nil.
126     need_procs = NODE_CONSTRAINT_MAP.each_pair.map do |job_key, node_key|
127       Proc.new do |node|
128         positive_int(node.info[node_key], 0) >=
129           positive_int(job.runtime_constraints[job_key], 0)
130       end
131     end
132     min_node_count = positive_int(job.runtime_constraints['min_nodes'], 1)
133     usable_nodes = []
134     Node.find_each do |node|
135       good_node = (node.info['slurm_state'] == 'idle')
136       need_procs.each { |node_test| good_node &&= node_test.call(node) }
137       if good_node
138         usable_nodes << node
139         if usable_nodes.count >= min_node_count
140           return usable_nodes
141         end
142       end
143     end
144     nil
145   end
146
147   def nodes_available_for_job(job)
148     # Check if there are enough idle nodes with the Job's minimum
149     # hardware requirements to run it.  If so, return an array of
150     # their names.  If not, up to once per hour, signal start_jobs to
151     # hold off launching Jobs.  This delay is meant to give the Node
152     # Manager an opportunity to make new resources available for new
153     # Jobs.
154     #
155     # The exact timing parameters here might need to be adjusted for
156     # the best balance between helping the longest-waiting Jobs run,
157     # and making efficient use of immediately available resources.
158     # These are all just first efforts until we have more data to work
159     # with.
160     nodelist = nodes_available_for_job_now(job)
161     if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
162       $stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
163       @node_wait_deadline = Time.now + 5.minutes
164     end
165     nodelist
166   end
167
168   def assign_job_to_nodes(assigned_job, nodes)
169     nodes.each do |node|
170       node.job = assigned_job
171       if not node.save
172         $stderr.puts("dispatch: failed to save #{node.uuid} assignment to " +
173                      "job #{assigned_job.uuid}: #{node.errors.messages}")
174       end
175     end
176   end
177
178   def start_jobs
179     @todo.each do |job|
180       next if @running[job.uuid]
181
182       cmd_args = nil
183       case Server::Application.config.crunch_job_wrapper
184       when :none
185         if @running.size > 0
186             # Don't run more than one at a time.
187             return
188         end
189         cmd_args = []
190       when :slurm_immediate
191         nodelist = nodes_available_for_job(job)
192         if nodelist.nil?
193           if Time.now < @node_wait_deadline
194             break
195           else
196             next
197           end
198         end
199         cmd_args = ["salloc",
200                     "--chdir=/",
201                     "--immediate",
202                     "--exclusive",
203                     "--no-kill",
204                     "--job-name=#{job.uuid}",
205                     "--nodelist=#{nodelist.map(&:hostname).join(',')}"]
206       else
207         raise "Unknown crunch_job_wrapper: #{Server::Application.config.crunch_job_wrapper}"
208       end
209
210       next if !take(job)
211
212       if Server::Application.config.crunch_job_user
213         cmd_args.unshift("sudo", "-E", "-u",
214                          Server::Application.config.crunch_job_user,
215                          "PATH=#{ENV['PATH']}",
216                          "PERLLIB=#{ENV['PERLLIB']}",
217                          "PYTHONPATH=#{ENV['PYTHONPATH']}",
218                          "RUBYLIB=#{ENV['RUBYLIB']}",
219                          "GEM_PATH=#{ENV['GEM_PATH']}")
220       end
221
222       job_auth = ApiClientAuthorization.
223         new(user: User.where('uuid=?', job.modified_by_user_uuid).first,
224             api_client_id: 0)
225       job_auth.save
226
227       crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
228       if crunch_job_bin == ''
229         raise "No CRUNCH_JOB_BIN env var, and crunch-job not in path."
230       end
231
232       require 'shellwords'
233
234       arvados_internal = Rails.configuration.git_internal_dir
235       if not File.exists? arvados_internal
236         $stderr.puts `mkdir -p #{arvados_internal.shellescape} && cd #{arvados_internal.shellescape} && git init --bare`
237       end
238
239       repo_root = Rails.configuration.git_repositories_dir
240       src_repo = File.join(repo_root, job.repository + '.git')
241       if not File.exists? src_repo
242         src_repo = File.join(repo_root, job.repository, '.git')
243         if not File.exists? src_repo
244           $stderr.puts "dispatch: No #{job.repository}.git or #{job.repository}/.git at #{repo_root}"
245           sleep 1
246           untake job
247           next
248         end
249       end
250
251       $stderr.puts `cd #{arvados_internal.shellescape} && git fetch-pack --all #{src_repo.shellescape} && git tag #{job.uuid.shellescape} #{job.script_version.shellescape}`
252       unless $? == 0
253         $stderr.puts "dispatch: git fetch-pack && tag failed"
254         sleep 1
255         untake job
256         next
257       end
258
259       cmd_args << crunch_job_bin
260       cmd_args << '--job-api-token'
261       cmd_args << job_auth.api_token
262       cmd_args << '--job'
263       cmd_args << job.uuid
264       cmd_args << '--git-dir'
265       cmd_args << arvados_internal
266
267       $stderr.puts "dispatch: #{cmd_args.join ' '}"
268
269       begin
270         i, o, e, t = Open3.popen3(*cmd_args)
271       rescue
272         $stderr.puts "dispatch: popen3: #{$!}"
273         sleep 1
274         untake(job)
275         next
276       end
277
278       $stderr.puts "dispatch: job #{job.uuid}"
279       start_banner = "dispatch: child #{t.pid} start #{Time.now.ctime.to_s}"
280       $stderr.puts start_banner
281
282       @running[job.uuid] = {
283         stdin: i,
284         stdout: o,
285         stderr: e,
286         wait_thr: t,
287         job: job,
288         stderr_buf: '',
289         started: false,
290         sent_int: 0,
291         job_auth: job_auth,
292         stderr_buf_to_flush: '',
293         stderr_flushed_at: 0,
294         bytes_logged: 0,
295         events_logged: 0,
296         log_truncated: false
297       }
298       i.close
299       update_node_status
300     end
301   end
302
303   def take(job)
304     # no-op -- let crunch-job take care of locking.
305     true
306   end
307
308   def untake(job)
309     # no-op -- let crunch-job take care of locking.
310     true
311   end
312
313   def read_pipes
314     @running.each do |job_uuid, j|
315       job = j[:job]
316
317       # Throw away child stdout
318       begin
319         j[:stdout].read_nonblock(2**20)
320       rescue Errno::EAGAIN, EOFError
321       end
322
323       # Read whatever is available from child stderr
324       stderr_buf = false
325       begin
326         stderr_buf = j[:stderr].read_nonblock(2**20)
327       rescue Errno::EAGAIN, EOFError
328       end
329
330       if stderr_buf
331         j[:stderr_buf] << stderr_buf
332         if j[:stderr_buf].index "\n"
333           lines = j[:stderr_buf].lines("\n").to_a
334           if j[:stderr_buf][-1] == "\n"
335             j[:stderr_buf] = ''
336           else
337             j[:stderr_buf] = lines.pop
338           end
339           lines.each do |line|
340             $stderr.print "#{job_uuid} ! " unless line.index(job_uuid)
341             $stderr.puts line
342             pub_msg = "#{Time.now.ctime.to_s} #{line.strip} \n"
343             if not j[:log_truncated]
344               j[:stderr_buf_to_flush] << pub_msg
345             end
346           end
347
348           if not j[:log_truncated]
349             if (Rails.configuration.crunch_log_bytes_per_event < j[:stderr_buf_to_flush].size or
350                 (j[:stderr_flushed_at] + Rails.configuration.crunch_log_seconds_between_events < Time.now.to_i))
351               write_log j
352             end
353           end
354         end
355       end
356     end
357   end
358
359   def reap_children
360     return if 0 == @running.size
361     pid_done = nil
362     j_done = nil
363
364     if false
365       begin
366         pid_done = waitpid(-1, Process::WNOHANG | Process::WUNTRACED)
367         if pid_done
368           j_done = @running.values.
369             select { |j| j[:wait_thr].pid == pid_done }.
370             first
371         end
372       rescue SystemCallError
373         # I have @running processes but system reports I have no
374         # children. This is likely to happen repeatedly if it happens at
375         # all; I will log this no more than once per child process I
376         # start.
377         if 0 < @running.select { |uuid,j| j[:warned_waitpid_error].nil? }.size
378           children = @running.values.collect { |j| j[:wait_thr].pid }.join ' '
379           $stderr.puts "dispatch: IPC bug: waitpid() error (#{$!}), but I have children #{children}"
380         end
381         @running.each do |uuid,j| j[:warned_waitpid_error] = true end
382       end
383     else
384       @running.each do |uuid, j|
385         if j[:wait_thr].status == false
386           pid_done = j[:wait_thr].pid
387           j_done = j
388         end
389       end
390     end
391
392     return if !pid_done
393
394     job_done = j_done[:job]
395     $stderr.puts "dispatch: child #{pid_done} exit"
396     $stderr.puts "dispatch: job #{job_done.uuid} end"
397
398     # Ensure every last drop of stdout and stderr is consumed
399     read_pipes
400     write_log j_done # write any remaining logs
401
402     if j_done[:stderr_buf] and j_done[:stderr_buf] != ''
403       $stderr.puts j_done[:stderr_buf] + "\n"
404     end
405
406     # Wait the thread (returns a Process::Status)
407     exit_status = j_done[:wait_thr].value
408
409     jobrecord = Job.find_by_uuid(job_done.uuid)
410     if exit_status.to_i != 75 and jobrecord.started_at
411       # Clean up state fields in case crunch-job exited without
412       # putting the job in a suitable "finished" state.
413       jobrecord.running = false
414       jobrecord.finished_at ||= Time.now
415       if jobrecord.success.nil?
416         jobrecord.success = false
417       end
418       jobrecord.save!
419     else
420       # Don't fail the job if crunch-job didn't even get as far as
421       # starting it. If the job failed to run due to an infrastructure
422       # issue with crunch-job or slurm, we want the job to stay in the
423       # queue. If crunch-job exited after losing a race to another
424       # crunch-job process, it exits 75 and we should leave the job
425       # record alone so the winner of the race do its thing.
426       #
427       # There is still an unhandled race condition: If our crunch-job
428       # process is about to lose a race with another crunch-job
429       # process, but crashes before getting to its "exit 75" (for
430       # example, "cannot fork" or "cannot reach API server") then we
431       # will assume incorrectly that it's our process's fault
432       # jobrecord.started_at is non-nil, and mark the job as failed
433       # even though the winner of the race is probably still doing
434       # fine.
435     end
436
437     # Invalidate the per-job auth token
438     j_done[:job_auth].update_attributes expires_at: Time.now
439
440     @running.delete job_done.uuid
441   end
442
443   def update_pipelines
444     expire_tokens = @pipe_auth_tokens.dup
445     @todo_pipelines.each do |p|
446       pipe_auth = (@pipe_auth_tokens[p.uuid] ||= ApiClientAuthorization.
447                    create(user: User.where('uuid=?', p.modified_by_user_uuid).first,
448                           api_client_id: 0))
449       puts `export ARVADOS_API_TOKEN=#{pipe_auth.api_token} && arv-run-pipeline-instance --run-pipeline-here --no-wait --instance #{p.uuid}`
450       expire_tokens.delete p.uuid
451     end
452
453     expire_tokens.each do |k, v|
454       v.update_attributes expires_at: Time.now
455       @pipe_auth_tokens.delete k
456     end
457   end
458
459   def run
460     act_as_system_user
461     @running ||= {}
462     @pipe_auth_tokens ||= { }
463     $stderr.puts "dispatch: ready"
464     while !$signal[:term] or @running.size > 0
465       read_pipes
466       if $signal[:term]
467         @running.each do |uuid, j|
468           if !j[:started] and j[:sent_int] < 2
469             begin
470               Process.kill 'INT', j[:wait_thr].pid
471             rescue Errno::ESRCH
472               # No such pid = race condition + desired result is
473               # already achieved
474             end
475             j[:sent_int] += 1
476           end
477         end
478       else
479         refresh_todo unless did_recently(:refresh_todo, 1.0)
480         update_node_status
481         unless @todo.empty? or did_recently(:start_jobs, 1.0) or $signal[:term]
482           start_jobs
483         end
484         unless (@todo_pipelines.empty? and @pipe_auth_tokens.empty?) or did_recently(:update_pipelines, 5.0)
485           update_pipelines
486         end
487       end
488       reap_children
489       select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
490              [], [], 1)
491     end
492   end
493
494   protected
495
496   def too_many_bytes_logged_for_job(j)
497     return (j[:bytes_logged] + j[:stderr_buf_to_flush].size >
498             Rails.configuration.crunch_limit_log_event_bytes_per_job)
499   end
500
501   def too_many_events_logged_for_job(j)
502     return (j[:events_logged] >= Rails.configuration.crunch_limit_log_events_per_job)
503   end
504
505   def did_recently(thing, min_interval)
506     @did_recently ||= {}
507     if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
508       @did_recently[thing] = Time.now
509       false
510     else
511       true
512     end
513   end
514
515   # send message to log table. we want these records to be transient
516   def write_log running_job
517     return if running_job[:log_truncated]
518     return if running_job[:stderr_buf_to_flush] == ''
519     begin
520       # Truncate logs if they exceed crunch_limit_log_event_bytes_per_job
521       # or crunch_limit_log_events_per_job.
522       if (too_many_bytes_logged_for_job(running_job))
523         running_job[:log_truncated] = true
524         running_job[:stderr_buf_to_flush] =
525           "Server configured limit reached (crunch_limit_log_event_bytes_per_job: #{Rails.configuration.crunch_limit_log_event_bytes_per_job}). Subsequent logs truncated"
526       elsif (too_many_events_logged_for_job(running_job))
527         running_job[:log_truncated] = true
528         running_job[:stderr_buf_to_flush] =
529           "Server configured limit reached (crunch_limit_log_events_per_job: #{Rails.configuration.crunch_limit_log_events_per_job}). Subsequent logs truncated"
530       end
531       log = Log.new(object_uuid: running_job[:job].uuid,
532                     event_type: 'stderr',
533                     owner_uuid: running_job[:job].owner_uuid,
534                     properties: {"text" => running_job[:stderr_buf_to_flush]})
535       log.save!
536       running_job[:bytes_logged] += running_job[:stderr_buf_to_flush].size
537       running_job[:events_logged] += 1
538     rescue
539       running_job[:stderr_buf] = "Failed to write logs\n" + running_job[:stderr_buf]
540     end
541     running_job[:stderr_buf_to_flush] = ''
542     running_job[:stderr_flushed_at] = Time.now.to_i
543   end
544
545 end
546
547 # This is how crunch-job child procs know where the "refresh" trigger file is
548 ENV["CRUNCH_REFRESH_TRIGGER"] = Rails.configuration.crunch_refresh_trigger
549
550 Dispatcher.new.run