from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
-from .context import ClusterTarget
from functools import partial
from schema_salad.sourceline import SourceLine
from cwltool.errors import WorkflowException
toolpath_object, # type: Dict[Text, Any]
pos, # type: int
loadingContext, # type: LoadingContext
+ arvrunner,
*argc,
**argv
): # type: (...) -> None
super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
self.tool["class"] = "WorkflowStep"
+ self.arvrunner = arvrunner
def job(self, joborder, output_callback, runtimeContext):
builder = self._init_job({shortname(k): v for k,v in joborder.items()}, runtimeContext)
**argv
):
# (...) -> WorkflowStep
- return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, *argc, **argv)
+ return ArvadosWorkflowStep(toolpath_object, pos, loadingContext, self.arvrunner, *argc, **argv)
def __init__(self, kwargs=None):
super(ArvLoadingContext, self).__init__(kwargs)
-ClusterTarget = namedtuple("ClusterTarget", ("instance", "cluster_id", "owner_uuid"))
-
class ArvRuntimeContext(RuntimeContext):
def __init__(self, kwargs=None):
self.work_api = None