Skip to content

Flux

FluxInterface

Bases: ABC

Source code in maestrowf/abstracts/interfaces/flux.py
class FluxInterface(ABC):

    @classmethod
    def connect_to_flux(cls):
        if not cls.flux_handle:
            cls.flux_handle = flux.Flux()
            LOGGER.debug("New Flux handle created.")
            broker_version_str = cls.flux_handle.attr_get("version")
            adaptor_version_str = cls.key
            LOGGER.debug(
                "Connected to Flux broker running version %s using Maestro "
                "adapter version %s.", broker_version_str, adaptor_version_str)

            versions_parsed = True
            try:
                # from distutils.version import StrictVersion
                # adaptor_version = StrictVersion(adaptor_version)
                # broker_version = StrictVersion(broker_version)
                adaptor_version = parse_version(adaptor_version_str)
            except InvalidVersion:
                LOGGER.warning("Could not parse flux adaptor version '%s'."
                               "May experience unexpected behavior.",
                               adaptor_version_str)
                versions_parsed = False

            try:
                broker_version = parse_version(broker_version_str)
            except InvalidVersion:
                LOGGER.warning("Could not parse flux broker version '%s'."
                               "May experience unexpected behavior.",
                               broker_version_str)
                versions_parsed = False

            if not versions_parsed:
                return

            if adaptor_version.base_version > broker_version.base_version:
                LOGGER.error(
                    "Maestro adapter version (%s) is too new for the Flux "
                    "broker version (%s). Functionality not present in "
                    "this Flux version may be required by the adapter and "
                    "cause errors. Please switch to an older adapter.",
                    adaptor_version, broker_version
                )
            elif adaptor_version.base_version < broker_version.base_version:
                LOGGER.debug(
                    "Maestro adaptor version (%s) is older than the Flux "
                    "broker version (%s). This is usually OK, but if a "
                    "newer Maestro adapter is available, please consider "
                    "upgrading to maximize performance and compatibility.",
                    adaptor_version, broker_version
                )
            # TODO: add custom version object to more properly handle dev
            #       and prerelease versions for both semver and pep440 version
            #       schemes.  Then add log message reflecting it if detected

    @classmethod
    def get_flux_version(cls):
        cls.connect_to_flux()
        # from distutils.version import StrictVersion
        # return StrictVersion(cls.flux_handle.attr_get("version"))
        # return parse_version(cls.flux_handle.attr_get("version"))
        return cls.flux_handle.attr_get("version")

    @classmethod
    def get_broker_queues(cls):
        """
        Use flux's rpc interface to get available queues to submit to.
        Current (~0.74) flux behavior for nested brokers' is to only have an
        anonymous queue without explicit user configuration to create some.

        Todo: locate flux version where queue support was added in case not all
        adapters can support it.
        """
        cls.connect_to_flux()
        queue_config = cls.flux_handle.rpc("config.get").get().get("queues", {})
        return list(queue_config.keys())

    @classmethod
    def get_broker_user_banks(cls):
        """
        Use flux's rpc interface to get available banks for current user.
        Current (~0.74) flux behavior for nested brokers' is to not have
        accounting plugin active.

        Todo: locate flux version where accounting was available
        """
        cls.connect_to_flux()
        username = getpass.getuser()
        try:
            banks = cls.flux_handle.rpc("accounting.view_user",
                                        {"list_banks": True,
                                         "username": username,
                                         "parsable": True,  # Mandatory in 0.75?
                                         "format": ""}).get()
        except OSError as be:
            if '[Errno 38] No service matching accounting.view_user is registered' not in str(be):
                raise           # If some other unexpected error raise it
            banks = ""

        # rpc call returns single string: 'bank1\nbank2\nbank3\n'
        if isinstance(banks, dict) and 'view_user' in banks:
            bank_list = banks['view_user'].split('\n')
        else:
            bank_list = []

        return bank_list

    @classmethod
    def get_broker_all_banks(cls):
        """
        Use flux's rpc interface to get all available banks on this machine.
        Current (~0.74) flux behavior for nested brokers' is to not have
        accounting plugin active.
        """
        cls.connect_to_flux()
        try:
            banks = cls.flux_handle.rpc("accounting.list_banks",
                                        {"inactive": True,
                                         "table": False,
                                         "json": True,  # Default in 0.75 is no longer json
                                         "fields": "bank",
                                         "format": ""}).get()
        except OSError as be:
            if '[Errno 38] No service matching accounting.list_banks is registered' not in str(be):
                raise           # If some other unexpected error raise it
            banks = {'list_banks': "[{}]"}

        # rpc call returns list of banks as json string
        bank_dicts = json.loads(banks['list_banks'])
        bank_list = []
        for bdict in bank_dicts:
            if isinstance(bdict, dict) and 'bank' in bdict:
                bank_list.append(bdict['bank'])

        return bank_list

    @classmethod
    @abstractmethod
    def get_flux_urgency(cls, urgency) -> int:
        """
        Map a fixed enumeration or floating point priority to a Flux urgency.

        :param priority: Float or StepPriority enum representing priorty.
        :returns: An integery mapping the urgency parameter to a Flux urgency.
        """
        raise NotImplementedError()

    @classmethod
    @abstractmethod
    def get_statuses(cls, joblist):
        """
        Return the statuses from a given Flux handle and joblist.

        :param joblist: A list of jobs to check the status of.
        :return: A dictionary of job identifiers to statuses.
        """
        ...

    @classmethod
    @abstractmethod
    def state(state):
        """
        Map a scheduler specific job state to a Study.State enum.

        :param adapter: Instance of a FluxAdapter
        :param state: A string of the state returned by Flux
        :return: The mapped Study.State enumeration
        """
        ...

    @classmethod
    @abstractmethod
    def parallelize(cls, procs, nodes=None, **kwargs):
        """
        Create a parallelized Flux command for launching.

        :param procs: Number of processors to use.
        :param nodes: Number of nodes the parallel call will span.
        :param kwargs: Extra keyword arguments.
        :return: A string of a Flux MPI command.
        """
        ...

    @classmethod
    @abstractmethod
    def submit(
        cls, nodes, procs, cores_per_task, path, cwd, walltime,
        npgus=0, job_name=None, force_broker=False, urgency=StepPriority.MEDIUM
    ):
        """
        Submit a job using this Flux interface's submit API.

        :param nodes: The number of nodes to request on submission.
        :param procs: The number of cores to request on submission.
        :param cores_per_task: The number of cores per MPI task.
        :param path: Path to the script to be submitted.
        :param cwd: Path to the workspace to execute the script in.
        :param walltime: HH:MM:SS formatted time string for job duration.
        :param ngpus: The number of GPUs to request on submission.
        :param job_name: A name string to assign the submitted job.
        :param force_broker: Forces the script to run under a Flux sub-broker.
        :param urgency: Enumerated scheduling priority for the submitted job.
        :return: A string representing the jobid returned by Flux submit.
        :return: An integer of the return code submission returned.
        :return: SubmissionCode enumeration that reflects result of submission.
        """
        ...

    @classmethod
    @abstractmethod
    def cancel(cls, joblist):
        """
        Cancel a job using this Flux interface's cancellation API.

        :param joblist: A list of job identifiers to cancel.
        :return: CancelCode enumeration that reflects result of cancellation.
        """
        ...

    @property
    @abstractmethod
    def key(self):
        """
        Return the key name for a ScriptAdapter.

        This is used to register the adapter in the ScriptAdapterFactory
        and when writing the workflow specification.

        :return: A string of the name of a FluxInterface class.
        """
        ...

    @classmethod
    def addtl_alloc_arg_types(cls):
        """
        Return set of additional allocation args that this adapter knows how
        to wire up to the jobspec python apis, e.g. 'attributes',
        'shell_options', ... This is aimed specifically at the repeated types,
        which collect many flags/key=value pairs which go through a specific
        jobspec call.  Everything not here gets dumped into a 'misc' group
        for individual handling.

        :return: List of string

        .. note::

           Should we have an enum for these or something vs random strings?
        """
        # default nothing, overriding in implementation
        return []

    @classmethod
    def render_additional_args(cls, args_dict):
        """
        Helper to render additional argument sets to flux cli format for
        use in constructing $(LAUNCHER) line and flux batch directives.
        This default implementation yields a single empty string.

        :param args_dict: Dictionary of flux arg keys and name: value pairs
        :yield: formatted strings of cli options/values

        .. note::

           Promote this to the general/base adapters to handle non-normalizable
           scheduler/machine specific options
        """        
        yield ""

    @classmethod
    def addtl_alloc_arg_type_map(cls, option):
        """
        Map verbose/brief cli arg option name (o from -o, setopt from --setopt)
        onto known alloc arg types this interface implements

        :param option: option string corresponding to flux cli input
        :return: string, one of known_alloc_arg_types
        """
        # Default to pass through, override in implementation
        return option

    @classmethod
    def get_addtl_arg_cli_key(cls, arg_type):
        """
        Return expected cli key associated with each normalized arg type.
        `arg_type` not in known_arg_types are assumed to be the key already
        to facilitate flexible pass through to launcher

        :param arg_type: string noting arg group or cli key
        :returns: cli key used for this arg

        .. note::

           Can we find a reasonable default prefix (where are things put
           by default in flux, attributes.system?)
        """
        # Default to pass through, handling known types/mapping in implementation
        return arg_type

    @staticmethod
    def get_cli_arg_prefix_sep(cli_key):
        """
        Helper for rendering extra options on cli/batch directives.  Sets prefix
        and value separator based on length of cli key.  Flux has two conventions:
        single letter cli_key has prefix of '-' and separator of ' ' while
        multiletter cli_key has prefix of '--' and separator of '='.  Examples
        '-o foo=2' or '--setopt=foo=2' for single letter cli_key (o) and
        multiletter (setopt) forms to set the same option.

        :param cli_key: the key to use on the cli form of an argument
        :type cli_key: str
        :returns: dict containing 'prefix' and 'sep' for use in rendering
        """
        if len(cli_key) == 1:
            return {"prefix": "-", "sep": " "}
        else:
            return {"prefix": "--", "sep": "="}

    @classmethod
    def normalize_additional_args(cls, args_dict, group_name=None, filter_unknown=False):
        """
        Helper to normalize additional arguments to known types and an
        unflattened nested dictionary structure.  This unflattens any
        dotpath encoded nested dictionary keys.

        :param args_dict: Dictionary of flux arg keys and name: value pairs
        :type args_dict: dict
        :param group_name: Optional name of group/tag to use in log messages
                           when filtering_unknown is on
        :type group_name: str
        :param filter_unknown: flag to block pass through of unknown args, e.g.
                               for allocation where we can't handle arbitrary
        :type filter_unknown: bool
        :return: dict of packed args with top level keys being the adapter
                 version specific addtl_alloc_arg_types
        """
        return args_dict

key abstractmethod property

Return the key name for a ScriptAdapter.

This is used to register the adapter in the ScriptAdapterFactory and when writing the workflow specification.

Returns:

Type Description

A string of the name of a FluxInterface class.

addtl_alloc_arg_type_map(option) classmethod

Map verbose/brief cli arg option name (o from -o, setopt from --setopt) onto known alloc arg types this interface implements

Parameters:

Name Type Description Default
option

option string corresponding to flux cli input

required

Returns:

Type Description

string, one of known_alloc_arg_types

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def addtl_alloc_arg_type_map(cls, option):
    """
    Map verbose/brief cli arg option name (o from -o, setopt from --setopt)
    onto known alloc arg types this interface implements

    :param option: option string corresponding to flux cli input
    :return: string, one of known_alloc_arg_types
    """
    # Default to pass through, override in implementation
    return option

addtl_alloc_arg_types() classmethod

Return set of additional allocation args that this adapter knows how to wire up to the jobspec python apis, e.g. 'attributes', 'shell_options', ... This is aimed specifically at the repeated types, which collect many flags/key=value pairs which go through a specific jobspec call. Everything not here gets dumped into a 'misc' group for individual handling.

Returns:

Type Description

List of string .. note:: Should we have an enum for these or something vs random strings?

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def addtl_alloc_arg_types(cls):
    """
    Return set of additional allocation args that this adapter knows how
    to wire up to the jobspec python apis, e.g. 'attributes',
    'shell_options', ... This is aimed specifically at the repeated types,
    which collect many flags/key=value pairs which go through a specific
    jobspec call.  Everything not here gets dumped into a 'misc' group
    for individual handling.

    :return: List of string

    .. note::

       Should we have an enum for these or something vs random strings?
    """
    # default nothing, overriding in implementation
    return []

cancel(joblist) abstractmethod classmethod

Cancel a job using this Flux interface's cancellation API.

Parameters:

Name Type Description Default
joblist

A list of job identifiers to cancel.

required

Returns:

Type Description

CancelCode enumeration that reflects result of cancellation.

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
@abstractmethod
def cancel(cls, joblist):
    """
    Cancel a job using this Flux interface's cancellation API.

    :param joblist: A list of job identifiers to cancel.
    :return: CancelCode enumeration that reflects result of cancellation.
    """
    ...

get_addtl_arg_cli_key(arg_type) classmethod

Return expected cli key associated with each normalized arg type. arg_type not in known_arg_types are assumed to be the key already to facilitate flexible pass through to launcher

Parameters:

Name Type Description Default
arg_type

string noting arg group or cli key

required

Returns:

Type Description

cli key used for this arg .. note:: Can we find a reasonable default prefix (where are things put by default in flux, attributes.system?)

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def get_addtl_arg_cli_key(cls, arg_type):
    """
    Return expected cli key associated with each normalized arg type.
    `arg_type` not in known_arg_types are assumed to be the key already
    to facilitate flexible pass through to launcher

    :param arg_type: string noting arg group or cli key
    :returns: cli key used for this arg

    .. note::

       Can we find a reasonable default prefix (where are things put
       by default in flux, attributes.system?)
    """
    # Default to pass through, handling known types/mapping in implementation
    return arg_type

get_broker_all_banks() classmethod

Use flux's rpc interface to get all available banks on this machine. Current (~0.74) flux behavior for nested brokers' is to not have accounting plugin active.

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def get_broker_all_banks(cls):
    """
    Use flux's rpc interface to get all available banks on this machine.
    Current (~0.74) flux behavior for nested brokers' is to not have
    accounting plugin active.
    """
    cls.connect_to_flux()
    try:
        banks = cls.flux_handle.rpc("accounting.list_banks",
                                    {"inactive": True,
                                     "table": False,
                                     "json": True,  # Default in 0.75 is no longer json
                                     "fields": "bank",
                                     "format": ""}).get()
    except OSError as be:
        if '[Errno 38] No service matching accounting.list_banks is registered' not in str(be):
            raise           # If some other unexpected error raise it
        banks = {'list_banks': "[{}]"}

    # rpc call returns list of banks as json string
    bank_dicts = json.loads(banks['list_banks'])
    bank_list = []
    for bdict in bank_dicts:
        if isinstance(bdict, dict) and 'bank' in bdict:
            bank_list.append(bdict['bank'])

    return bank_list

get_broker_queues() classmethod

Use flux's rpc interface to get available queues to submit to. Current (~0.74) flux behavior for nested brokers' is to only have an anonymous queue without explicit user configuration to create some.

Todo: locate flux version where queue support was added in case not all adapters can support it.

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def get_broker_queues(cls):
    """
    Use flux's rpc interface to get available queues to submit to.
    Current (~0.74) flux behavior for nested brokers' is to only have an
    anonymous queue without explicit user configuration to create some.

    Todo: locate flux version where queue support was added in case not all
    adapters can support it.
    """
    cls.connect_to_flux()
    queue_config = cls.flux_handle.rpc("config.get").get().get("queues", {})
    return list(queue_config.keys())

get_broker_user_banks() classmethod

Use flux's rpc interface to get available banks for current user. Current (~0.74) flux behavior for nested brokers' is to not have accounting plugin active.

Todo: locate flux version where accounting was available

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def get_broker_user_banks(cls):
    """
    Use flux's rpc interface to get available banks for current user.
    Current (~0.74) flux behavior for nested brokers' is to not have
    accounting plugin active.

    Todo: locate flux version where accounting was available
    """
    cls.connect_to_flux()
    username = getpass.getuser()
    try:
        banks = cls.flux_handle.rpc("accounting.view_user",
                                    {"list_banks": True,
                                     "username": username,
                                     "parsable": True,  # Mandatory in 0.75?
                                     "format": ""}).get()
    except OSError as be:
        if '[Errno 38] No service matching accounting.view_user is registered' not in str(be):
            raise           # If some other unexpected error raise it
        banks = ""

    # rpc call returns single string: 'bank1\nbank2\nbank3\n'
    if isinstance(banks, dict) and 'view_user' in banks:
        bank_list = banks['view_user'].split('\n')
    else:
        bank_list = []

    return bank_list

get_cli_arg_prefix_sep(cli_key) staticmethod

Helper for rendering extra options on cli/batch directives. Sets prefix and value separator based on length of cli key. Flux has two conventions: single letter cli_key has prefix of '-' and separator of ' ' while multiletter cli_key has prefix of '--' and separator of '='. Examples '-o foo=2' or '--setopt=foo=2' for single letter cli_key (o) and multiletter (setopt) forms to set the same option.

Parameters:

Name Type Description Default
cli_key str

the key to use on the cli form of an argument

required

Returns:

Type Description

dict containing 'prefix' and 'sep' for use in rendering

Source code in maestrowf/abstracts/interfaces/flux.py
@staticmethod
def get_cli_arg_prefix_sep(cli_key):
    """
    Helper for rendering extra options on cli/batch directives.  Sets prefix
    and value separator based on length of cli key.  Flux has two conventions:
    single letter cli_key has prefix of '-' and separator of ' ' while
    multiletter cli_key has prefix of '--' and separator of '='.  Examples
    '-o foo=2' or '--setopt=foo=2' for single letter cli_key (o) and
    multiletter (setopt) forms to set the same option.

    :param cli_key: the key to use on the cli form of an argument
    :type cli_key: str
    :returns: dict containing 'prefix' and 'sep' for use in rendering
    """
    if len(cli_key) == 1:
        return {"prefix": "-", "sep": " "}
    else:
        return {"prefix": "--", "sep": "="}

get_flux_urgency(urgency) abstractmethod classmethod

Map a fixed enumeration or floating point priority to a Flux urgency.

Parameters:

Name Type Description Default
priority

Float or StepPriority enum representing priorty.

required

Returns:

Type Description
int

An integery mapping the urgency parameter to a Flux urgency.

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
@abstractmethod
def get_flux_urgency(cls, urgency) -> int:
    """
    Map a fixed enumeration or floating point priority to a Flux urgency.

    :param priority: Float or StepPriority enum representing priorty.
    :returns: An integery mapping the urgency parameter to a Flux urgency.
    """
    raise NotImplementedError()

get_statuses(joblist) abstractmethod classmethod

Return the statuses from a given Flux handle and joblist.

Parameters:

Name Type Description Default
joblist

A list of jobs to check the status of.

required

Returns:

Type Description

A dictionary of job identifiers to statuses.

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
@abstractmethod
def get_statuses(cls, joblist):
    """
    Return the statuses from a given Flux handle and joblist.

    :param joblist: A list of jobs to check the status of.
    :return: A dictionary of job identifiers to statuses.
    """
    ...

normalize_additional_args(args_dict, group_name=None, filter_unknown=False) classmethod

Helper to normalize additional arguments to known types and an unflattened nested dictionary structure. This unflattens any dotpath encoded nested dictionary keys.

Parameters:

Name Type Description Default
args_dict dict

Dictionary of flux arg keys and name: value pairs

required
group_name str

Optional name of group/tag to use in log messages when filtering_unknown is on

None
filter_unknown bool

flag to block pass through of unknown args, e.g. for allocation where we can't handle arbitrary

False

Returns:

Type Description

dict of packed args with top level keys being the adapter version specific addtl_alloc_arg_types

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def normalize_additional_args(cls, args_dict, group_name=None, filter_unknown=False):
    """
    Helper to normalize additional arguments to known types and an
    unflattened nested dictionary structure.  This unflattens any
    dotpath encoded nested dictionary keys.

    :param args_dict: Dictionary of flux arg keys and name: value pairs
    :type args_dict: dict
    :param group_name: Optional name of group/tag to use in log messages
                       when filtering_unknown is on
    :type group_name: str
    :param filter_unknown: flag to block pass through of unknown args, e.g.
                           for allocation where we can't handle arbitrary
    :type filter_unknown: bool
    :return: dict of packed args with top level keys being the adapter
             version specific addtl_alloc_arg_types
    """
    return args_dict

parallelize(procs, nodes=None, **kwargs) abstractmethod classmethod

Create a parallelized Flux command for launching.

Parameters:

Name Type Description Default
procs

Number of processors to use.

required
nodes

Number of nodes the parallel call will span.

None
kwargs

Extra keyword arguments.

{}

Returns:

Type Description

A string of a Flux MPI command.

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
@abstractmethod
def parallelize(cls, procs, nodes=None, **kwargs):
    """
    Create a parallelized Flux command for launching.

    :param procs: Number of processors to use.
    :param nodes: Number of nodes the parallel call will span.
    :param kwargs: Extra keyword arguments.
    :return: A string of a Flux MPI command.
    """
    ...

render_additional_args(args_dict) classmethod

Helper to render additional argument sets to flux cli format for use in constructing $(LAUNCHER) line and flux batch directives. This default implementation yields a single empty string.

:yield: formatted strings of cli options/values

.. note::

Promote this to the general/base adapters to handle non-normalizable scheduler/machine specific options

Parameters:

Name Type Description Default
args_dict

Dictionary of flux arg keys and name: value pairs

required
Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
def render_additional_args(cls, args_dict):
    """
    Helper to render additional argument sets to flux cli format for
    use in constructing $(LAUNCHER) line and flux batch directives.
    This default implementation yields a single empty string.

    :param args_dict: Dictionary of flux arg keys and name: value pairs
    :yield: formatted strings of cli options/values

    .. note::

       Promote this to the general/base adapters to handle non-normalizable
       scheduler/machine specific options
    """        
    yield ""

state(state) abstractmethod classmethod

Map a scheduler specific job state to a Study.State enum.

Parameters:

Name Type Description Default
adapter

Instance of a FluxAdapter

required
state

A string of the state returned by Flux

required

Returns:

Type Description

The mapped Study.State enumeration

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
@abstractmethod
def state(state):
    """
    Map a scheduler specific job state to a Study.State enum.

    :param adapter: Instance of a FluxAdapter
    :param state: A string of the state returned by Flux
    :return: The mapped Study.State enumeration
    """
    ...

submit(nodes, procs, cores_per_task, path, cwd, walltime, npgus=0, job_name=None, force_broker=False, urgency=StepPriority.MEDIUM) abstractmethod classmethod

Submit a job using this Flux interface's submit API.

Parameters:

Name Type Description Default
nodes

The number of nodes to request on submission.

required
procs

The number of cores to request on submission.

required
cores_per_task

The number of cores per MPI task.

required
path

Path to the script to be submitted.

required
cwd

Path to the workspace to execute the script in.

required
walltime

HH:MM:SS formatted time string for job duration.

required
ngpus

The number of GPUs to request on submission.

required
job_name

A name string to assign the submitted job.

None
force_broker

Forces the script to run under a Flux sub-broker.

False
urgency

Enumerated scheduling priority for the submitted job.

MEDIUM

Returns:

Type Description

SubmissionCode enumeration that reflects result of submission.

Source code in maestrowf/abstracts/interfaces/flux.py
@classmethod
@abstractmethod
def submit(
    cls, nodes, procs, cores_per_task, path, cwd, walltime,
    npgus=0, job_name=None, force_broker=False, urgency=StepPriority.MEDIUM
):
    """
    Submit a job using this Flux interface's submit API.

    :param nodes: The number of nodes to request on submission.
    :param procs: The number of cores to request on submission.
    :param cores_per_task: The number of cores per MPI task.
    :param path: Path to the script to be submitted.
    :param cwd: Path to the workspace to execute the script in.
    :param walltime: HH:MM:SS formatted time string for job duration.
    :param ngpus: The number of GPUs to request on submission.
    :param job_name: A name string to assign the submitted job.
    :param force_broker: Forces the script to run under a Flux sub-broker.
    :param urgency: Enumerated scheduling priority for the submitted job.
    :return: A string representing the jobid returned by Flux submit.
    :return: An integer of the return code submission returned.
    :return: SubmissionCode enumeration that reflects result of submission.
    """
    ...