Skip to content

Interfaces


Abstract classes for handling interfacing with various services.

SchedulerScriptAdapter

Bases: ScriptAdapter

Abstract class representing the interface for scheduling scripts.

This class handles both the construction of scripts (as required by the ScriptAdapter base class) but also includes the necessary methods for constructing parallel commands. The adapter will substitute parallelized commands but also defines how to schedule and check job status.

Source code in maestrowf/abstracts/interfaces/schedulerscriptadapter.py
@six.add_metaclass(ABCMeta)
class SchedulerScriptAdapter(ScriptAdapter):
    """
    Abstract class representing the interface for scheduling scripts.

    This class handles both the construction of scripts (as required by the
    ScriptAdapter base class) but also includes the necessary methods for
    constructing parallel commands. The adapter will substitute parallelized
    commands but also defines how to schedule and check job status.
    """

    # The var tag to look for to replace for parallelized commands.
    launcher_var = "$(LAUNCHER)"
    # Allocation regex and compilation
    # Keeping this one here for legacy.
    launcher_regex = re.compile(
        re.escape(launcher_var) + r"\[(?P<alloc>.*)\]")

    # We can have multiple requested submission properties.
    # Legacy allocation of nodes and procs.
    legacy_alloc = r"(?P<nodes>[0-9]+),\s*(?P<procs>[0-9]+)"
    # Just allocate based on tasks.
    task_alloc = r"(?P<procs>[0-9]+)p"
    # Just allocate based on nodes.
    node_alloc = r"(?P<nodes>[0-9]+)n"

    def __init__(self, **kwargs):
        """
        Initialize an empty ScriptAdapter object.

        :param kwargs: Key-value dictionary of arguments.

        Currently we only support the "shell" keyword.
        """
        # NOTE: The _batch member should be used to store persistent batching
        # parameters. The entries in this dictionary are meant to capture the
        # the base settings for submission to a batch. This member variables
        # should never be used publicly outside of an instance.

        # Call super to set self._exec
        super(SchedulerScriptAdapter, self).__init__(**kwargs)
        self._batch = {}

    def add_batch_parameter(self, name, value):
        """
        Add a parameter to the ScriptAdapter instance.

        :param name: String name of the parameter that's being added.
        :param value: Value associated with the parameter name (should have a
            str method).
        """
        self._batch[name] = value

    @abstractmethod
    def get_header(self, step):
        """
        Generate the header present at the top of execution scripts.

        :param step: A StudyStep instance.
        :returns: A string of the header based on internal batch parameters and
            the parameter step.
        """
        pass

    @abstractmethod
    def get_parallelize_command(self, procs, nodes, **kwargs):
        """
        Generate the parallelization segment of the command line.

        :param procs: Number of processors to allocate to the parallel call.
        :param nodes: Number of nodes to allocate to the parallel call
            (default = 1).
        :returns: A string of the parallelize command configured using nodes
            and procs.
        """
        pass

    def _substitute_parallel_command(self, step_cmd, **kwargs):
        """
        Substitute parallelized segments into a specified command.

        :param step_cmd: Command string to parallelize.
        :param nodes: Total number of requested nodes.
        :param procs: Total number of requested processors.
        :returns: The new command with all allocations substituted.
        """
        err_msg = "{} attempting to allocate {} {} for a parallel call with" \
                  " a maximum allocation of {}"

        nodes = kwargs.get("nodes")
        procs = kwargs.get("procs")
        addl_args = dict(kwargs)
        addl_args.pop("nodes")
        addl_args.pop("procs")

        LOGGER.debug("nodes=%s; procs=%s", nodes, procs)
        # See if the command contains a launcher token in it.
        alloc_search = list(re.finditer(self.launcher_regex, step_cmd))
        if alloc_search:
            # If we find that launcher nomenclature.
            total_nodes = 0     # Total nodes we've allocated so far.
            total_procs = 0     # Total processors we've allocated so far.
            cmd = step_cmd      # The step command we'll substitute into.
            for match in alloc_search:
                LOGGER.debug("Found a match: %s", match.group())
                _nodes = None
                _procs = None
                # Look for the allocation information in the match.
                _alloc = match.group("alloc")
                # Search for the legacy format.
                _legacy = re.search(self.legacy_alloc, _alloc)
                if _legacy:
                    # nodes, procs legacy notation.
                    _ = _alloc.split(",")
                    _nodes = _[0]
                    _procs = _[1]
                    LOGGER.debug(
                        "Legacy setup detected. (nodes=%s, procs=%s)",
                        _nodes,
                        _procs
                    )
                else:
                    # We're dealing with the new style.
                    # Make sure we only have at most one proc and node
                    # allocation specified.
                    if _alloc.count("p") > 1 or _alloc.count("n") > 1:
                        msg = "cmd: {}\n Invalid allocations specified ({})." \
                              " Number of nodes and/or procs must only be " \
                              "specified once." \
                              .format(step_cmd, _alloc)
                        LOGGER.error(msg)
                        raise ValueError(msg)

                    if _alloc.count("p") < 1:
                        msg = "cmd: {}\n Invalid allocations specified ({})." \
                              " Processors/tasks must be specified." \
                              .format(step_cmd, _alloc)
                        LOGGER.error(msg)
                        raise ValueError(msg)

                    _nodes = re.search(self.node_alloc, _alloc)
                    if _nodes:
                        _nodes = _nodes.group("nodes")
                    _procs = re.search(self.task_alloc, _alloc)
                    if _procs:
                        _procs = _procs.group("procs")

                    LOGGER.debug(
                        "New setup detected. (nodes=%s, procs=%s)",
                        _nodes,
                        _procs
                    )

                msg = []
                # Check that the requested nodes are within range.
                if _nodes:
                    _ = int(_nodes)
                    total_nodes += _
                    if _ > nodes:
                        msg.append(
                            err_msg.format(
                                match.group(), _nodes, "nodes", nodes
                            )
                        )
                # Check that the requested processors is within range.
                if _procs:
                    _ = int(_procs)
                    total_procs += _
                    if _ > procs:
                        msg.append(
                            err_msg.format(
                                match.group(), _procs, "procs", procs
                            )
                        )
                # If we have constructed a message, raise an exception.
                if msg:
                    LOGGER.error(msg)
                    raise ValueError(msg)

                pcmd = self.get_parallelize_command(
                    _procs, _nodes, **addl_args
                )
                cmd = cmd.replace(match.group(), pcmd)

            # Verify that the total nodes/procs used is within maximum.
            if total_procs > procs:
                msg = "Total processors ({}) requested exceeds the " \
                      "maximum requested ({})".format(total_procs, procs)
                LOGGER.error(msg)
                raise ValueError(msg)

            if nodes and total_nodes > nodes:
                msg = "Total nodes ({}) requested exceeds the " \
                      "maximum requested ({})".format(total_nodes, nodes)
                LOGGER.error(msg)
                raise ValueError(msg)

            return cmd
        else:
            # 3. Two smaller cases here. If we see the launcher token WITHOUT
            # any parameters, replace it there with full nodes and procs.
            # Otherwise, just return the command. A user may simply want to run
            # an unparallelized code in a submission.
            pcmd = self.get_parallelize_command(procs, nodes, **addl_args)
            # Catch the case where the launcher token appears on its own
            if self.launcher_var in step_cmd:
                LOGGER.debug(
                    "'%s' found in cmd. Substituting", self.launcher_var)
                return step_cmd.replace(self.launcher_var, pcmd)
            else:
                LOGGER.debug("The command did not specify an MPI command.")
                return step_cmd

    def get_scheduler_command(self, step):
        """
        Generate the full parallelized command for use in a batch script.

        :param step: A StudyStep instance.
        :returns:
            1. A Boolean value - True if command is to be scheduled, False
            otherwise.
            2. A string representing the parallelized batch command for the
            specified step command.
            3. A string representing the parallelized batch command for the
            specified step restart command.
        """
        # We should never get a study step that doesn't have a run entry; but
        # better to be safe.
        if not step.run:
            msg = "Malformed StudyStep. A StudyStep requires a run entry."
            LOGGER.error(msg)
            raise ValueError(msg)

        # If the user is requesting nodes, we need to request the nodes and
        # set up the command with scheduling.
        _nodes = step.run.get("nodes", 0)
        _procs = step.run.get("procs", 0)
        if _nodes or _procs:
            to_be_scheduled = True
            cmd = self._substitute_parallel_command(
                step.run["cmd"],
                **step.run
            )
            LOGGER.debug("Scheduling command: %s", cmd)

            # Also check for the restart command and parallelize it too.
            restart = ""
            if step.run["restart"]:
                restart = self._substitute_parallel_command(
                    step.run["restart"],
                    **step.run
                )
                LOGGER.debug("Restart command: %s", cmd)
            LOGGER.info("Scheduling workflow step '%s'.", step.name)
        # Otherwise, just return the command. It doesn't need scheduling.
        else:
            LOGGER.info("Running workflow step '%s' locally.", step.name)
            to_be_scheduled = False
            cmd = step.run["cmd"]
            restart = step.run["restart"]

        return to_be_scheduled, cmd, restart

    @abstractmethod
    def _write_script(self, ws_path, step):
        """
        Write a script to the workspace of a workflow step.

        The job_map optional parameter is a map of workflow step names to job
        identifiers. This parameter so far is only planned to be used when a
        study is configured to be launched in one go (more or less a script
        chain using a scheduler's dependency setting). The functionality of
        the parameter may change depending on both future intended use and
        derived classes.

        :param ws_path: Path to the workspace directory of the step.
        :param step: An instance of a StudyStep.
        :returns: Boolean value (True if the workflow step is to be scheduled,
            False otherwise) and the path to the written script.
        """
        pass

    @abstractmethod
    def _state(self, job_state):
        """
        Map a scheduler specific job state to a Study.State enum.

        :param job_state: String representation of scheduler job status.
        :returns: A Study.State enum corresponding to parameter job_state.
        """
        pass

    def get_priority(self, priority):
        """
        Map a fixed enumeration or floating point priority to a batch priority.

        :param priority: Float or StepPriority enum representing priorty.
        :returns: A string, integer, or float value representing the mapped
        priority to the batch scheduler.
        """
        raise NotImplementedError()

__init__(**kwargs)

Initialize an empty ScriptAdapter object.

Parameters:

Name Type Description Default
kwargs

Key-value dictionary of arguments. Currently we only support the "shell" keyword.

{}
Source code in maestrowf/abstracts/interfaces/schedulerscriptadapter.py
def __init__(self, **kwargs):
    """
    Initialize an empty ScriptAdapter object.

    :param kwargs: Key-value dictionary of arguments.

    Currently we only support the "shell" keyword.
    """
    # NOTE: The _batch member should be used to store persistent batching
    # parameters. The entries in this dictionary are meant to capture the
    # the base settings for submission to a batch. This member variables
    # should never be used publicly outside of an instance.

    # Call super to set self._exec
    super(SchedulerScriptAdapter, self).__init__(**kwargs)
    self._batch = {}

add_batch_parameter(name, value)

Add a parameter to the ScriptAdapter instance.

Parameters:

Name Type Description Default
name

String name of the parameter that's being added.

required
value

Value associated with the parameter name (should have a str method).

required
Source code in maestrowf/abstracts/interfaces/schedulerscriptadapter.py
def add_batch_parameter(self, name, value):
    """
    Add a parameter to the ScriptAdapter instance.

    :param name: String name of the parameter that's being added.
    :param value: Value associated with the parameter name (should have a
        str method).
    """
    self._batch[name] = value

get_header(step) abstractmethod

Generate the header present at the top of execution scripts.

Parameters:

Name Type Description Default
step

A StudyStep instance.

required

Returns:

Type Description

A string of the header based on internal batch parameters and the parameter step.

Source code in maestrowf/abstracts/interfaces/schedulerscriptadapter.py
@abstractmethod
def get_header(self, step):
    """
    Generate the header present at the top of execution scripts.

    :param step: A StudyStep instance.
    :returns: A string of the header based on internal batch parameters and
        the parameter step.
    """
    pass

get_parallelize_command(procs, nodes, **kwargs) abstractmethod

Generate the parallelization segment of the command line.

Parameters:

Name Type Description Default
procs

Number of processors to allocate to the parallel call.

required
nodes

Number of nodes to allocate to the parallel call (default = 1).

required

Returns:

Type Description

A string of the parallelize command configured using nodes and procs.

Source code in maestrowf/abstracts/interfaces/schedulerscriptadapter.py
@abstractmethod
def get_parallelize_command(self, procs, nodes, **kwargs):
    """
    Generate the parallelization segment of the command line.

    :param procs: Number of processors to allocate to the parallel call.
    :param nodes: Number of nodes to allocate to the parallel call
        (default = 1).
    :returns: A string of the parallelize command configured using nodes
        and procs.
    """
    pass

get_priority(priority)

Map a fixed enumeration or floating point priority to a batch priority.

Parameters:

Name Type Description Default
priority

Float or StepPriority enum representing priorty.

required

Returns:

Type Description

A string, integer, or float value representing the mapped priority to the batch scheduler.

Source code in maestrowf/abstracts/interfaces/schedulerscriptadapter.py
def get_priority(self, priority):
    """
    Map a fixed enumeration or floating point priority to a batch priority.

    :param priority: Float or StepPriority enum representing priorty.
    :returns: A string, integer, or float value representing the mapped
    priority to the batch scheduler.
    """
    raise NotImplementedError()

get_scheduler_command(step)

Generate the full parallelized command for use in a batch script.

Parameters:

Name Type Description Default
step

A StudyStep instance.

required

Returns:

Type Description
  1. A Boolean value - True if command is to be scheduled, False otherwise. 2. A string representing the parallelized batch command for the specified step command. 3. A string representing the parallelized batch command for the specified step restart command.
Source code in maestrowf/abstracts/interfaces/schedulerscriptadapter.py
def get_scheduler_command(self, step):
    """
    Generate the full parallelized command for use in a batch script.

    :param step: A StudyStep instance.
    :returns:
        1. A Boolean value - True if command is to be scheduled, False
        otherwise.
        2. A string representing the parallelized batch command for the
        specified step command.
        3. A string representing the parallelized batch command for the
        specified step restart command.
    """
    # We should never get a study step that doesn't have a run entry; but
    # better to be safe.
    if not step.run:
        msg = "Malformed StudyStep. A StudyStep requires a run entry."
        LOGGER.error(msg)
        raise ValueError(msg)

    # If the user is requesting nodes, we need to request the nodes and
    # set up the command with scheduling.
    _nodes = step.run.get("nodes", 0)
    _procs = step.run.get("procs", 0)
    if _nodes or _procs:
        to_be_scheduled = True
        cmd = self._substitute_parallel_command(
            step.run["cmd"],
            **step.run
        )
        LOGGER.debug("Scheduling command: %s", cmd)

        # Also check for the restart command and parallelize it too.
        restart = ""
        if step.run["restart"]:
            restart = self._substitute_parallel_command(
                step.run["restart"],
                **step.run
            )
            LOGGER.debug("Restart command: %s", cmd)
        LOGGER.info("Scheduling workflow step '%s'.", step.name)
    # Otherwise, just return the command. It doesn't need scheduling.
    else:
        LOGGER.info("Running workflow step '%s' locally.", step.name)
        to_be_scheduled = False
        cmd = step.run["cmd"]
        restart = step.run["restart"]

    return to_be_scheduled, cmd, restart

ScriptAdapter

Bases: object

Abstract class representing the interface for constructing scripts.

The ScriptAdapter abstract class is meant to provide a consistent high level interface to generate scripts automatically based on an ExecutionDAG. Adapters as a whole should only interface with the ExecutionDAG because it is ultimately the DAG that manages the state of tasks. Adapters attempt to bridge the 'how' in an abstract way such that the interface is refined to methods such as: - Generating a script with the proper syntax to submit. - Submitting a script using the proper command. - Checking job status.

Source code in maestrowf/abstracts/interfaces/scriptadapter.py
@six.add_metaclass(ABCMeta)
class ScriptAdapter(object):
    """
    Abstract class representing the interface for constructing scripts.

    The ScriptAdapter abstract class is meant to provide a consistent high
    level interface to generate scripts automatically based on an ExecutionDAG.
    Adapters as a whole should only interface with the ExecutionDAG because it
    is ultimately the DAG that manages the state of tasks. Adapters attempt to
    bridge the 'how' in an abstract way such that the interface is refined to
    methods such as:
    - Generating a script with the proper syntax to submit.
    - Submitting a script using the proper command.
    - Checking job status.
    """

    def __init__(self, **kwargs):
        """
        Initialize a new instance of a ScriptAdapter.

        :param kwargs: The key value arguments for the ScriptAdapter instance.
        """
        self._exec = kwargs.pop("shell", "/bin/bash")
        LOGGER.debug("Shell set to '%s'.", self._exec)

    @abstractmethod
    def check_jobs(self, joblist):
        """
        For the given job list, query execution status.

        :param joblist: A list of job identifiers to be queried.
        :returns: The return code of the status query, and a dictionary of job
            identifiers to their status.
        """
        pass

    @abstractmethod
    def cancel_jobs(self, joblist):
        """
        For the given job list, cancel each job.

        :param joblist: A list of job identifiers to be cancelled.
        :returns: The return code to indicate if jobs were cancelled.
        """
        pass

    @abstractmethod
    def _write_script(self, ws_path, step):
        """
        Write a script to the workspace of a workflow step.

        The job_map optional parameter is a map of workflow step names to job
        identifiers. This parameter so far is only planned to be used when a
        study is configured to be launched in one go (more or less a script
        chain using a scheduler's dependency setting). The functionality of
        the parameter may change depending on both future intended use and
        derived classes.

        :param ws_path: Path to the workspace directory of the step.
        :param step: An instance of a StudyStep.
        :returns: Boolean value (True if the workflow step is to be scheduled,
            False otherwise) and the path to the written script.
        """
        pass

    def write_script(self, ws_path, step):
        """
        Generate the script for the specified StudyStep.

        :param ws_path: Workspace path for the step.
        :param step: An instance of a StudyStep class.
        :returns: A tuple containing a boolean set to True if step should be
            scheduled (False otherwise), path to the generate script, and path
            to the generated restart script (None if step cannot be restarted).
        """
        to_be_scheduled, script_path, restart_path = \
            self._write_script(ws_path, step)
        st = os.stat(script_path)
        os.chmod(script_path, st.st_mode | stat.S_IXUSR)

        if restart_path:
            st = os.stat(restart_path)
            os.chmod(restart_path, st.st_mode | stat.S_IXUSR)

        LOGGER.debug(
            "---------------------------------\n"
            "Script path:   %s\n"
            "Restart path:  %s\n"
            "Scheduled?:    %s\n"
            "---------------------------------\n",
            script_path, restart_path, to_be_scheduled
        )
        return to_be_scheduled, script_path, restart_path

    @abstractmethod
    def submit(self, step, path, cwd, job_map=None, env=None):
        """
        Submit a script to the scheduler.

        If cwd is specified, the submit method will operate outside of the path
        specified by the 'cwd' parameter.
        If env is specified, the submit method will set the environment
        variables for submission to the specified values. The 'env' parameter
        should be a dictionary of environment variables.

        :param step: An instance of a StudyStep.
        :param path: Path to the script to be executed.
        :param cwd: Path to the current working directory.
        :param job_map: A map of workflow step names to their job identifiers.
        :param env: A dict containing a modified environment for execution.
        :returns: The return code of the submission command and job identiifer.
        """
        pass

    @abstractproperty
    def extension(self):
        """
        Returns the extension that generated scripts will use.

        :returns: A string of the extension
        """
        pass

    @abstractproperty
    def key(self):
        """
        Return the key name for a ScriptAdapter..

        This is used to register the adapter in the ScriptAdapterFactory
        and when writing the workflow specification.
        """
        pass

__init__(**kwargs)

Initialize a new instance of a ScriptAdapter.

Parameters:

Name Type Description Default
kwargs

The key value arguments for the ScriptAdapter instance.

{}
Source code in maestrowf/abstracts/interfaces/scriptadapter.py
def __init__(self, **kwargs):
    """
    Initialize a new instance of a ScriptAdapter.

    :param kwargs: The key value arguments for the ScriptAdapter instance.
    """
    self._exec = kwargs.pop("shell", "/bin/bash")
    LOGGER.debug("Shell set to '%s'.", self._exec)

cancel_jobs(joblist) abstractmethod

For the given job list, cancel each job.

Parameters:

Name Type Description Default
joblist

A list of job identifiers to be cancelled.

required

Returns:

Type Description

The return code to indicate if jobs were cancelled.

Source code in maestrowf/abstracts/interfaces/scriptadapter.py
@abstractmethod
def cancel_jobs(self, joblist):
    """
    For the given job list, cancel each job.

    :param joblist: A list of job identifiers to be cancelled.
    :returns: The return code to indicate if jobs were cancelled.
    """
    pass

check_jobs(joblist) abstractmethod

For the given job list, query execution status.

Parameters:

Name Type Description Default
joblist

A list of job identifiers to be queried.

required

Returns:

Type Description

The return code of the status query, and a dictionary of job identifiers to their status.

Source code in maestrowf/abstracts/interfaces/scriptadapter.py
@abstractmethod
def check_jobs(self, joblist):
    """
    For the given job list, query execution status.

    :param joblist: A list of job identifiers to be queried.
    :returns: The return code of the status query, and a dictionary of job
        identifiers to their status.
    """
    pass

extension()

Returns the extension that generated scripts will use.

Returns:

Type Description

A string of the extension

Source code in maestrowf/abstracts/interfaces/scriptadapter.py
@abstractproperty
def extension(self):
    """
    Returns the extension that generated scripts will use.

    :returns: A string of the extension
    """
    pass

key()

Return the key name for a ScriptAdapter..

This is used to register the adapter in the ScriptAdapterFactory and when writing the workflow specification.

Source code in maestrowf/abstracts/interfaces/scriptadapter.py
@abstractproperty
def key(self):
    """
    Return the key name for a ScriptAdapter..

    This is used to register the adapter in the ScriptAdapterFactory
    and when writing the workflow specification.
    """
    pass

submit(step, path, cwd, job_map=None, env=None) abstractmethod

Submit a script to the scheduler.

If cwd is specified, the submit method will operate outside of the path specified by the 'cwd' parameter. If env is specified, the submit method will set the environment variables for submission to the specified values. The 'env' parameter should be a dictionary of environment variables.

Parameters:

Name Type Description Default
step

An instance of a StudyStep.

required
path

Path to the script to be executed.

required
cwd

Path to the current working directory.

required
job_map

A map of workflow step names to their job identifiers.

None
env

A dict containing a modified environment for execution.

None

Returns:

Type Description

The return code of the submission command and job identiifer.

Source code in maestrowf/abstracts/interfaces/scriptadapter.py
@abstractmethod
def submit(self, step, path, cwd, job_map=None, env=None):
    """
    Submit a script to the scheduler.

    If cwd is specified, the submit method will operate outside of the path
    specified by the 'cwd' parameter.
    If env is specified, the submit method will set the environment
    variables for submission to the specified values. The 'env' parameter
    should be a dictionary of environment variables.

    :param step: An instance of a StudyStep.
    :param path: Path to the script to be executed.
    :param cwd: Path to the current working directory.
    :param job_map: A map of workflow step names to their job identifiers.
    :param env: A dict containing a modified environment for execution.
    :returns: The return code of the submission command and job identiifer.
    """
    pass

write_script(ws_path, step)

Generate the script for the specified StudyStep.

Parameters:

Name Type Description Default
ws_path

Workspace path for the step.

required
step

An instance of a StudyStep class.

required

Returns:

Type Description

A tuple containing a boolean set to True if step should be scheduled (False otherwise), path to the generate script, and path to the generated restart script (None if step cannot be restarted).

Source code in maestrowf/abstracts/interfaces/scriptadapter.py
def write_script(self, ws_path, step):
    """
    Generate the script for the specified StudyStep.

    :param ws_path: Workspace path for the step.
    :param step: An instance of a StudyStep class.
    :returns: A tuple containing a boolean set to True if step should be
        scheduled (False otherwise), path to the generate script, and path
        to the generated restart script (None if step cannot be restarted).
    """
    to_be_scheduled, script_path, restart_path = \
        self._write_script(ws_path, step)
    st = os.stat(script_path)
    os.chmod(script_path, st.st_mode | stat.S_IXUSR)

    if restart_path:
        st = os.stat(restart_path)
        os.chmod(restart_path, st.st_mode | stat.S_IXUSR)

    LOGGER.debug(
        "---------------------------------\n"
        "Script path:   %s\n"
        "Restart path:  %s\n"
        "Scheduled?:    %s\n"
        "---------------------------------\n",
        script_path, restart_path, to_be_scheduled
    )
    return to_be_scheduled, script_path, restart_path