class Dispatcher
include ApplicationHelper
+ include DbCurrentTime
def initialize
@crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
nodelist = nodes_available_for_job_now(job)
if nodelist.nil? and not did_recently(:wait_for_available_nodes, 3600)
$stderr.puts "dispatch: waiting for nodes for #{job.uuid}"
- @node_wait_deadline = Time.now + 5.minutes
+ @node_wait_deadline = db_current_time + 5.minutes
end
nodelist
end
# We already made a token for this job, but we need a new one
# because modified_by_user_uuid has changed (the job will run
# as a different user).
- @authorizations[job.uuid].update_attributes expires_at: Time.now
+ @authorizations[job.uuid].update_attributes expires_at: db_current_time
@authorizations[job.uuid] = nil
end
if not @authorizations[job.uuid]
when :slurm_immediate
nodelist = nodes_available_for_job(job)
if nodelist.nil?
- if Time.now < @node_wait_deadline
+ if db_current_time < @node_wait_deadline
break
else
next
bytes_logged: 0,
events_logged: 0,
log_throttle_is_open: true,
- log_throttle_reset_time: Time.now + Rails.configuration.crunch_log_throttle_period,
+ log_throttle_reset_time: db_current_time + Rails.configuration.crunch_log_throttle_period,
log_throttle_bytes_so_far: 0,
log_throttle_lines_so_far: 0,
log_throttle_bytes_skipped: 0,
if (running_job[:bytes_logged] >
Rails.configuration.crunch_limit_log_bytes_per_job)
message = "Exceeded log limit #{Rails.configuration.crunch_limit_log_bytes_per_job} bytes (crunch_limit_log_bytes_per_job). Log will be truncated."
- running_job[:log_throttle_reset_time] = Time.now + 100.years
+ running_job[:log_throttle_reset_time] = db_current_time + 100.years
running_job[:log_throttle_is_open] = false
elsif (running_job[:log_throttle_bytes_so_far] >
Rails.configuration.crunch_log_throttle_bytes)
- remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ remaining_time = running_job[:log_throttle_reset_time] - db_current_time
message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_bytes} bytes per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_bytes). Logging will be silenced for the next #{remaining_time.round} seconds.\n"
running_job[:log_throttle_is_open] = false
elsif (running_job[:log_throttle_lines_so_far] >
Rails.configuration.crunch_log_throttle_lines)
- remaining_time = running_job[:log_throttle_reset_time] - Time.now
+ remaining_time = running_job[:log_throttle_reset_time] - db_current_time
message = "Exceeded rate #{Rails.configuration.crunch_log_throttle_lines} lines per #{Rails.configuration.crunch_log_throttle_period} seconds (crunch_log_throttle_lines), logging will be silenced for the next #{remaining_time.round} seconds.\n"
running_job[:log_throttle_is_open] = false
end
@running.each do |job_uuid, j|
job = j[:job]
- now = Time.now
+ now = db_current_time
if now > j[:log_throttle_reset_time]
# It has been more than throttle_period seconds since the last
# checkpoint so reset the throttle
# Invalidate the per-job auth token, unless the job is still queued and we
# might want to try it again.
if jobrecord.state != "Queued"
- j_done[:job_auth].update_attributes expires_at: Time.now
+ j_done[:job_auth].update_attributes expires_at: db_current_time
end
@running.delete job_done.uuid
end
expire_tokens.each do |k, v|
- v.update_attributes expires_at: Time.now
+ v.update_attributes expires_at: db_current_time
@pipe_auth_tokens.delete k
end
end
protected
def did_recently(thing, min_interval)
- if !@did_recently[thing] or @did_recently[thing] < Time.now - min_interval
- @did_recently[thing] = Time.now
+ current_time = db_current_time
+ if !@did_recently[thing] or @did_recently[thing] < current_time - min_interval
+ @did_recently[thing] = current_time
false
else
true
# it has been at least crunch_log_seconds_between_events seconds since
# the last flush.
if running_job[:stderr_buf_to_flush].size > Rails.configuration.crunch_log_bytes_per_event or
- (Time.now - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
+ (db_current_time - running_job[:stderr_flushed_at]) >= Rails.configuration.crunch_log_seconds_between_events
begin
log = Log.new(object_uuid: running_job[:job].uuid,
event_type: 'stderr',
$stderr.puts exception.backtrace
end
running_job[:stderr_buf_to_flush] = ''
- running_job[:stderr_flushed_at] = Time.now
+ running_job[:stderr_flushed_at] = db_current_time
end
end
end