+class ArvadosWorkflowStep(WorkflowStep):
+ def __init__(self,
+ toolpath_object, # type: Dict[Text, Any]
+ pos, # type: int
+ loadingContext, # type: LoadingContext
+ arvrunner,
+ *argc,
+ **argv
+ ): # type: (...) -> None
+
+ super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+ self.tool["class"] = "WorkflowStep"
+ self.arvrunner = arvrunner
+
+ def job(self, joborder, output_callback, runtimeContext):
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.toplevel = True # Preserve behavior for #13365
+
+ builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
+ runtimeContext, self.metadata)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+ return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
+