Skip to content


Module for interfaces that support various schedulers.


Bases: Record

A container for data returned from a scheduler cancellation call.

Source code in maestrowf/interfaces/script/
class CancellationRecord(Record):
    """A container for data returned from a scheduler cancellation call."""

    def __init__(self, cancel_status, retcode):
        """Initialize an empty CancellationRecord."""
        self._status = {
            CancelCode.OK:      set(),
            CancelCode.ERROR:   set(),
        }   # Map of cancellation status to job set.
        self._retcode = retcode
        self._cstatus = cancel_status

    def add_status(self, jobid, cancel_status):
        Add the cancellation status for a single job to a record.

        :param jobid: Unique job identifier for the job status to be added.
        :param cancel_status: CancelCode designating how cancellation
        if not isinstance(cancel_status, CancelCode):
            raise TypeError(
                "Parameter 'cancel_code' must be of type 'CancelCode'. "
                "Received type '%s' instead.", type(cancel_status))

    def cancel_status(self):
        """Get the high level CancelCode status."""
        return self._cstatus

    def return_code(self):
        """Get the return code from the cancel command."""
        return self._retcode

    def lookup_status(self, cancel_status):
        Find the cancellation status of the job identified by jid.

        :param cancel_status: The CancelCode to look up.
        :returns: Set of job identifiers that match the requested status.
        return self._status.get(cancel_status, set())

cancel_status property

Get the high level CancelCode status.

return_code property

Get the return code from the cancel command.

__init__(cancel_status, retcode)

Initialize an empty CancellationRecord.

Source code in maestrowf/interfaces/script/
def __init__(self, cancel_status, retcode):
    """Initialize an empty CancellationRecord."""
    self._status = {
        CancelCode.OK:      set(),
        CancelCode.ERROR:   set(),
    }   # Map of cancellation status to job set.
    self._retcode = retcode
    self._cstatus = cancel_status

add_status(jobid, cancel_status)

Add the cancellation status for a single job to a record.


Name Type Description Default

Unique job identifier for the job status to be added.


CancelCode designating how cancellation terminated.

Source code in maestrowf/interfaces/script/
def add_status(self, jobid, cancel_status):
    Add the cancellation status for a single job to a record.

    :param jobid: Unique job identifier for the job status to be added.
    :param cancel_status: CancelCode designating how cancellation
    if not isinstance(cancel_status, CancelCode):
        raise TypeError(
            "Parameter 'cancel_code' must be of type 'CancelCode'. "
            "Received type '%s' instead.", type(cancel_status))


Find the cancellation status of the job identified by jid.


Name Type Description Default

The CancelCode to look up.



Type Description

Set of job identifiers that match the requested status.

Source code in maestrowf/interfaces/script/
def lookup_status(self, cancel_status):
    Find the cancellation status of the job identified by jid.

    :param cancel_status: The CancelCode to look up.
    :returns: Set of job identifiers that match the requested status.
    return self._status.get(cancel_status, set())


Bases: object

A factory for swapping out Flux's backend interface based on version.

Source code in maestrowf/interfaces/script/
class FluxFactory(object):
    """A factory for swapping out Flux's backend interface based on version."""

    latest = "0.49.0"

    def _iter_flux():
        Based off of loop over a namespace and find the
        modules. This has been adapted for this particular use case of loading
        all classes implementing FluxInterface loaded from all modules in
        :return: an iterable of the classes existing in the namespace
        # get loader for the script adapter package
        loader = pkgutil.get_loader('maestrowf.interfaces.script._flux')
        # get all of the modules in the package
        mods = [(name, ispkg) for finder, name, ispkg in pkgutil.iter_modules(
                'maestrowf.interfaces.script._flux').__name__ + "."
        cs = []
        for name, _ in mods:
            # get loader for every module
            m = pkgutil.get_loader(name).load_module(name)
            # get all classes that implement ScriptAdapter and are not abstract
            for n, cls in m.__dict__.items():
                if isinstance(cls, type) and \
                 issubclass(cls, FluxInterface) and \
                 not inspect.isabstract(cls):
        return cs

    factories = {
       interface.key: interface for interface in _iter_flux()

    def get_interface(cls, interface_id):
        if interface_id.lower() not in cls.factories:
            msg = "Interface '{0}' not found. Specify a supported version " \
                  "of Flux or implement a new one mapping to the '{0}'" \
            raise Exception(msg)

        return cls.factories[interface_id]

    def get_valid_interfaces(cls):
        return cls.factories.keys()

    def get_latest_interface(cls):
        return cls.factories[cls.latest]


Bases: Record

A container for data about return state upon scheduler submission.

Source code in maestrowf/interfaces/script/
class SubmissionRecord(Record):
    """A container for data about return state upon scheduler submission."""

    def __init__(self, subcode, retcode, jobid=-1):
        Initialize a new SubmissionRecord.

        :param jobid: The assigned job identifier for this record.
        :param retcode: The submission code returned by the scheduler submit.
        super(SubmissionRecord, self).__init__()
        self._subcode = subcode

        if subcode == SubmissionCode.OK:
            # If we got an error, ignore the job identifier.
            self._info["jobid"] = jobid
        self._info["retcode"] = retcode

    def job_identifier(self):
        Property for the job identifier for the record.

        :returns: A string representing the job identifier assigned by the
        return self._info.get("jobid", None)

    def submission_code(self):
        Property for submission state for the record.

        :returns: A SubmissionCode enum representing the state of the
                  submission call.
        return self._subcode

    def return_code(self):
        Property for the raw return code returned from submission.

        :returns: An integer representing the state of the raw return code
                  from submission.
        return self._info["retcode"]

    def add_info(self, key, value):
        Set additional informational key-value information.

        :param key: Record key identifying data.
        :param value: Data to be recorded.
        self._info[key] = value

job_identifier property

Property for the job identifier for the record.


Type Description

A string representing the job identifier assigned by the scheduler.

return_code property

Property for the raw return code returned from submission.


Type Description

An integer representing the state of the raw return code from submission.

submission_code property

Property for submission state for the record.


Type Description

A SubmissionCode enum representing the state of the submission call.

__init__(subcode, retcode, jobid=-1)

Initialize a new SubmissionRecord.


Name Type Description Default

The assigned job identifier for this record.


The submission code returned by the scheduler submit.

Source code in maestrowf/interfaces/script/
def __init__(self, subcode, retcode, jobid=-1):
    Initialize a new SubmissionRecord.

    :param jobid: The assigned job identifier for this record.
    :param retcode: The submission code returned by the scheduler submit.
    super(SubmissionRecord, self).__init__()
    self._subcode = subcode

    if subcode == SubmissionCode.OK:
        # If we got an error, ignore the job identifier.
        self._info["jobid"] = jobid
    self._info["retcode"] = retcode

add_info(key, value)

Set additional informational key-value information.


Name Type Description Default

Record key identifying data.


Data to be recorded.

Source code in maestrowf/interfaces/script/
def add_info(self, key, value):
    Set additional informational key-value information.

    :param key: Record key identifying data.
    :param value: Data to be recorded.
    self._info[key] = value