X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f8af0c6c331d5b52deab50abf7afa8c7881cddfb..9bd3b2729a61f62ddbab10ac65fd9f7de837a10d:/services/api/app/models/job.rb diff --git a/services/api/app/models/job.rb b/services/api/app/models/job.rb index ec5f7dd5cc..248d16a4ef 100644 --- a/services/api/app/models/job.rb +++ b/services/api/app/models/job.rb @@ -2,6 +2,7 @@ class Job < ArvadosModel include HasUuid include KindAndEtag include CommonApiTemplate + serialize :components, Hash attr_protected :arvados_sdk_version, :docker_image_locator serialize :script_parameters, Hash serialize :runtime_constraints, Hash @@ -10,12 +11,14 @@ class Job < ArvadosModel after_commit :trigger_crunch_dispatch_if_cancelled, :on => :update before_validation :set_priority before_validation :update_state_from_old_state_attrs + before_validation :update_script_parameters_digest validate :ensure_script_version_is_commit - validate :find_arvados_sdk_version validate :find_docker_image_locator + validate :find_arvados_sdk_version validate :validate_status validate :validate_state_change validate :ensure_no_collection_uuids_in_script_params + before_save :tag_version_in_internal_repository before_save :update_timestamps_when_state_changes has_many :commit_ancestors, :foreign_key => :descendant, :primary_key => :script_version @@ -43,7 +46,6 @@ class Job < ArvadosModel t.add :log t.add :runtime_constraints t.add :tasks_summary -# t.add :dependencies t.add :nondeterministic t.add :repository t.add :supplied_script_version @@ -52,6 +54,7 @@ class Job < ArvadosModel t.add :queue_position t.add :node_uuids t.add :description + t.add :components end # Supported states for a job @@ -64,7 +67,7 @@ class Job < ArvadosModel ] def assert_finished - update_attributes(finished_at: finished_at || Time.now, + update_attributes(finished_at: finished_at || db_current_time, success: success.nil? ? false : success, running: false) end @@ -78,12 +81,13 @@ class Job < ArvadosModel end def queue_position - Job::queue.each_with_index do |job, index| - if job[:uuid] == self.uuid - return index - end - end - nil + # We used to report this accurately, but the implementation made queue + # API requests O(n**2) for the size of the queue. See #8800. + # We've soft-disabled it because it's not clear we even want this + # functionality: now that we have Node Manager with support for multiple + # node sizes, "queue position" tells you very little about when a job will + # run. + state == Queued ? 0 : nil end def self.running @@ -92,8 +96,7 @@ class Job < ArvadosModel end def lock locked_by_uuid - transaction do - self.reload + with_lock do unless self.state == Queued and self.is_locked_by_uuid.nil? raise AlreadyLockedError end @@ -103,8 +106,188 @@ class Job < ArvadosModel end end + def update_script_parameters_digest + self.script_parameters_digest = self.class.sorted_hash_digest(script_parameters) + end + + def self.searchable_columns operator + super - ["script_parameters_digest"] + end + + def self.load_job_specific_filters attrs, orig_filters, read_users + # Convert Job-specific @filters entries into general SQL filters. + script_info = {"repository" => nil, "script" => nil} + git_filters = Hash.new do |hash, key| + hash[key] = {"max_version" => "HEAD", "exclude_versions" => []} + end + filters = [] + orig_filters.each do |attr, operator, operand| + if (script_info.has_key? attr) and (operator == "=") + if script_info[attr].nil? + script_info[attr] = operand + elsif script_info[attr] != operand + raise ArgumentError.new("incompatible #{attr} filters") + end + end + case operator + when "in git" + git_filters[attr]["min_version"] = operand + when "not in git" + git_filters[attr]["exclude_versions"] += Array.wrap(operand) + when "in docker", "not in docker" + image_hashes = Array.wrap(operand).flat_map do |search_term| + image_search, image_tag = search_term.split(':', 2) + Collection. + find_all_for_docker_image(image_search, image_tag, read_users). + map(&:portable_data_hash) + end + filters << [attr, operator.sub(/ docker$/, ""), image_hashes] + else + filters << [attr, operator, operand] + end + end + + # Build a real script_version filter from any "not? in git" filters. + git_filters.each_pair do |attr, filter| + case attr + when "script_version" + script_info.each_pair do |key, value| + if value.nil? + raise ArgumentError.new("script_version filter needs #{key} filter") + end + end + filter["repository"] = script_info["repository"] + if attrs[:script_version] + filter["max_version"] = attrs[:script_version] + else + # Using HEAD, set earlier by the hash default, is fine. + end + when "arvados_sdk_version" + filter["repository"] = "arvados" + else + raise ArgumentError.new("unknown attribute for git filter: #{attr}") + end + revisions = Commit.find_commit_range(filter["repository"], + filter["min_version"], + filter["max_version"], + filter["exclude_versions"]) + if revisions.empty? + raise ArgumentError. + new("error searching #{filter['repository']} from " + + "'#{filter['min_version']}' to '#{filter['max_version']}', " + + "excluding #{filter['exclude_versions']}") + end + filters.append([attr, "in", revisions]) + end + + filters + end + + def self.find_reusable attrs, params, filters, read_users + if filters.empty? # Translate older creation parameters into filters. + filters = + [["repository", "=", attrs[:repository]], + ["script", "=", attrs[:script]], + ["script_version", "not in git", params[:exclude_script_versions]], + ].reject { |filter| filter.last.nil? or filter.last.empty? } + if !params[:minimum_script_version].blank? + filters << ["script_version", "in git", + params[:minimum_script_version]] + else + filters += default_git_filters("script_version", attrs[:repository], + attrs[:script_version]) + end + if image_search = attrs[:runtime_constraints].andand["docker_image"] + if image_tag = attrs[:runtime_constraints]["docker_image_tag"] + image_search += ":#{image_tag}" + end + image_locator = Collection. + for_latest_docker_image(image_search).andand.portable_data_hash + else + image_locator = nil + end + filters << ["docker_image_locator", "=", image_locator] + if sdk_version = attrs[:runtime_constraints].andand["arvados_sdk_version"] + filters += default_git_filters("arvados_sdk_version", "arvados", sdk_version) + end + filters = load_job_specific_filters(attrs, filters, read_users) + end + + # Check specified filters for some reasonableness. + filter_names = filters.map { |f| f.first }.uniq + ["repository", "script"].each do |req_filter| + if not filter_names.include?(req_filter) + return send_error("#{req_filter} filter required") + end + end + + # Search for a reusable Job, and return it if found. + candidates = Job. + readable_by(current_user). + where('state = ? or (owner_uuid = ? and state in (?))', + Job::Complete, current_user.uuid, [Job::Queued, Job::Running]). + where('script_parameters_digest = ?', Job.sorted_hash_digest(attrs[:script_parameters])). + where('nondeterministic is distinct from ?', true). + order('state desc, created_at') # prefer Running jobs over Queued + candidates = apply_filters candidates, filters + chosen = nil + incomplete_job = nil + candidates.each do |j| + if j.state != Job::Complete + # We'll use this if we don't find a job that has completed + incomplete_job ||= j + next + end + + if chosen == false + # We have already decided not to reuse any completed job + next + elsif chosen + if chosen.output != j.output + # If two matching jobs produced different outputs, run a new + # job (or use one that's already running/queued) instead of + # choosing one arbitrarily. + chosen = false + end + # ...and that's the only thing we need to do once we've chosen + # a job to reuse. + elsif !Collection.readable_by(current_user).find_by_portable_data_hash(j.output) + # As soon as the output we will end up returning (if any) is + # decided, check whether it will be visible to the user; if + # not, any further investigation of reusable jobs is futile. + chosen = false + else + chosen = j + end + end + chosen || incomplete_job + end + + def self.default_git_filters(attr_name, repo_name, refspec) + # Add a filter to @filters for `attr_name` = the latest commit available + # in `repo_name` at `refspec`. No filter is added if refspec can't be + # resolved. + commits = Commit.find_commit_range(repo_name, nil, refspec, nil) + if commit_hash = commits.first + [[attr_name, "=", commit_hash]] + else + [] + end + end + protected + def self.sorted_hash_digest h + Digest::MD5.hexdigest(Oj.dump(deep_sort_hash(h))) + end + + def self.deep_sort_hash h + return h unless h.is_a? Hash + h.sort.collect do |k, v| + [k, deep_sort_hash(v)] + end.to_h + end + def foreign_key_attributes super + %w(output log) end @@ -125,21 +308,43 @@ class Job < ArvadosModel end def ensure_script_version_is_commit - if self.state == Running + if state == Running # Apparently client has already decided to go for it. This is # needed to run a local job using a local working directory # instead of a commit-ish. return true end - if new_record? or script_version_changed? - sha1 = Commit.find_commit_range(current_user, self.repository, nil, self.script_version, nil)[0] rescue nil - if sha1 - self.supplied_script_version = self.script_version if self.supplied_script_version.nil? or self.supplied_script_version.empty? - self.script_version = sha1 - else - self.errors.add :script_version, "#{self.script_version} does not resolve to a commit" + if new_record? or repository_changed? or script_version_changed? + sha1 = Commit.find_commit_range(repository, + nil, script_version, nil).first + if not sha1 + errors.add :script_version, "#{script_version} does not resolve to a commit" return false end + if supplied_script_version.nil? or supplied_script_version.empty? + self.supplied_script_version = script_version + end + self.script_version = sha1 + end + true + end + + def tag_version_in_internal_repository + if state == Running + # No point now. See ensure_script_version_is_commit. + true + elsif errors.any? + # Won't be saved, and script_version might not even be valid. + true + elsif new_record? or repository_changed? or script_version_changed? + uuid_was = uuid + begin + assign_uuid + Commit.tag_in_internal_repository repository, script_version, uuid + rescue + uuid = uuid_was + raise + end end end @@ -170,9 +375,9 @@ class Job < ArvadosModel def find_arvados_sdk_version resolve_runtime_constraint("arvados_sdk_version", :arvados_sdk_version) do |git_search| - commits = Commit.find_commit_range(current_user, "arvados", + commits = Commit.find_commit_range("arvados", nil, git_search, nil) - if commits.nil? or commits.empty? + if commits.empty? [false, "#{git_search} does not resolve to a commit"] elsif not runtime_constraints["docker_image"] [false, "cannot be specified without a Docker image constraint"] @@ -183,6 +388,10 @@ class Job < ArvadosModel end def find_docker_image_locator + runtime_constraints['docker_image'] = + Rails.configuration.default_docker_image_for_jobs if ((runtime_constraints.is_a? Hash) and + (runtime_constraints['docker_image']).nil? and + Rails.configuration.default_docker_image_for_jobs) resolve_runtime_constraint("docker_image", :docker_image_locator) do |image_search| image_tag = runtime_constraints['docker_image_tag'] @@ -194,24 +403,6 @@ class Job < ArvadosModel end end - # def dependencies - # deps = {} - # queue = self.script_parameters.values - # while not queue.empty? - # queue = queue.flatten.compact.collect do |v| - # if v.is_a? Hash - # v.values - # elsif v.is_a? String - # v.match(/^(([0-9a-f]{32})\b(\+[^,]+)?,?)*$/) do |locator| - # deps[locator.to_s] = true - # end - # nil - # end - # end - # end - # deps.keys - # end - def permission_to_update if is_locked_by_uuid_was and !(current_user and (current_user.uuid == is_locked_by_uuid_was or @@ -230,7 +421,8 @@ class Job < ArvadosModel output_changed? or log_changed? or tasks_summary_changed? or - state_changed? + state_changed? or + components_changed? logger.warn "User #{current_user.uuid if current_user} tried to change protected job attributes on locked #{self.class.to_s} #{uuid_was}" return false end @@ -258,7 +450,7 @@ class Job < ArvadosModel # Ensure cancelled_at cannot be set to arbitrary non-now times, # or changed once it is set. if self.cancelled_at and not self.cancelled_at_was - self.cancelled_at = Time.now + self.cancelled_at = db_current_time self.cancelled_by_user_uuid = current_user.uuid self.cancelled_by_client_uuid = current_api_client.andand.uuid @need_crunch_dispatch_trigger = true @@ -284,11 +476,11 @@ class Job < ArvadosModel case state when Running - self.started_at ||= Time.now + self.started_at ||= db_current_time when Failed, Complete - self.finished_at ||= Time.now + self.finished_at ||= db_current_time when Cancelled - self.cancelled_at ||= Time.now + self.cancelled_at ||= db_current_time end # TODO: Remove the following case block when old "success" and