Skip to content

Flux0 26 0

FluxInterface_0260

Bases: FluxInterface

Source code in maestrowf/interfaces/script/_flux/flux0_26_0.py
class FluxInterface_0260(FluxInterface):
    # This utility class is for Flux 0.26.0
    key = "0.26.0"

    flux_handle = None
    _urgencies = {
        StepPriority.HELD: 0,
        StepPriority.MINIMAL: 1,
        StepPriority.LOW: 9,
        StepPriority.MEDIUM: 16,
        StepPriority.HIGH: 24,
        StepPriority.EXPEDITE: 31,
    }

    @classmethod
    def get_flux_urgency(cls, urgency) -> int:
        if isinstance(urgency, str):
            LOGGER.debug("Found string urgency: %s", urgency)
            urgency = StepPriority.from_str(urgency)

        if isinstance(urgency, StepPriority):
            LOGGER.debug("StepUrgency urgency of '%s' given..", urgency)
            return cls._urgencies[urgency]
        else:
            LOGGER.debug("Float urgency of '%s' given..", urgency)
            return ceil(float(urgency) * 31)

    @classmethod
    def submit(
        cls,
        nodes,
        procs,
        cores_per_task,
        path,
        cwd,
        walltime,
        ngpus=0,
        job_name=None,
        force_broker=True,
        urgency=StepPriority.MEDIUM,
        waitable=True
    ):
        try:
            # TODO: add better error handling/throwing in the class func
            # to enable more uniform detection/messaging when connection fails
            # to deal with both missing uri in allocations on non-flux machines
            cls.connect_to_flux()

            # NOTE: This previously placed everything under a broker. However,
            # if there's a job that schedules items to Flux, it will schedule
            # all new jobs to the sub-broker. Sometimes this is desired, but
            # it's incorrect to make that the general case. If we are asking
            # for a single node, don't use a broker -- but introduce a flag
            # that can force a single node to run in a broker.

            if force_broker:
                LOGGER.debug(
                    "Launch under Flux sub-broker. [force_broker=%s, "
                    "nodes=%d]",
                    force_broker,
                    nodes,
                )
                ngpus_per_slot = int(ceil(ngpus / nodes))
                jobspec = flux.job.JobspecV1.from_nest_command(
                    [path],
                    num_nodes=nodes,
                    cores_per_slot=cores_per_task,
                    num_slots=procs,
                    gpus_per_slot=ngpus_per_slot,
                )
            else:
                LOGGER.debug(
                    "Launch under root Flux broker. [force_broker=%s, "
                    "nodes=%d]",
                    force_broker,
                    nodes,
                )
                jobspec = flux.job.JobspecV1.from_command(
                    [path],
                    num_tasks=procs,
                    num_nodes=nodes,
                    cores_per_task=cores_per_task,
                    gpus_per_task=ngpus,
                )

            LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
            if job_name:
                jobspec.setattr("system.job.name", job_name)
            jobspec.cwd = cwd
            jobspec.environment = dict(os.environ)

            if walltime > 0:
                jobspec.duration = walltime

            jobspec.stdout = f"{job_name}.{{{{id}}}}.out"
            jobspec.stderr = f"{job_name}.{{{{id}}}}.err"

            # Submit our job spec.
            jobid = flux.job.submit(
                cls.flux_handle, jobspec, waitable=waitable, urgency=urgency
            )
            submit_status = SubmissionCode.OK
            retcode = 0

            LOGGER.info(
                "Submission returned status OK. -- "
                "Assigned identifier (%s)",
                jobid,
            )

            # NOTE: cannot pickle JobID instances, so must store jobid's as
            # strings and reconstruct for use later. Also ensure we get the
            # Base58 form instead of integer for better user facing logging
            jobid = str(jobid.f58)

        except ConnectionResetError as exception:
            LOGGER.error("Submission failed -- Message (%s).",
                         exception,
                         exc_info=True)
            jobid = -1
            retcode = -2
            submit_status = SubmissionCode.ERROR
        except Exception as exception:
            LOGGER.error("Submission failed -- Message (%s).",
                         exception,
                         exc_info=True)
            jobid = -1
            retcode = -1
            submit_status = SubmissionCode.ERROR

        return jobid, retcode, submit_status

    @classmethod
    def parallelize(cls, procs, nodes=None, **kwargs):
        args = ["flux", "mini", "run", "-n", str(procs)]

        # if we've specified nodes, add that to wreckrun
        ntasks = nodes if nodes else 1
        args.append("-N")
        args.append(str(ntasks))

        if "cores per task" in kwargs:
            args.append("-c")
            args.append(str(kwargs["cores per task"]))

        ngpus = kwargs.get("gpus", 0)
        if ngpus:
            gpus = str(ngpus)
            args.append("-g")
            args.append(gpus)

        # flux has additional arguments that can be passed via the '-o' flag.
        addtl = []
        addtl_args = kwargs.get("addtl_args", {})
        for key, value in addtl_args.items():
            addtl.append(f"{key}={value}")

        if addtl:
            args.append("-o")
            args.append(",".join(addtl))

        return " ".join(args)

    @classmethod
    def get_statuses(cls, joblist):
        # We need to import flux here, as it may not be installed on
        # all systems.
        cls.connect_to_flux()

        LOGGER.debug("Flux handle address -- %s", hex(id(cls.flux_handle)))

        # Reconstruct JobID instances from the str form of the Base58 id:
        # NOTE: cannot pickle JobID instances, so must store as strings and
        # reconstruct for use
        jobs_rpc = flux.job.list.JobList(
            cls.flux_handle,
            ids=[flux.job.JobID(jid) for jid in joblist])

        statuses = {}
        for jobinfo in jobs_rpc.jobs():
            LOGGER.debug(f"Checking status of job with id {str(jobinfo.id.f58)}")
            statuses[str(jobinfo.id.f58)] = cls.state(jobinfo.status_abbrev)

        chk_status = JobStatusCode.OK
        #  Print all errors accumulated in JobList RPC:
        try:
            for err in jobs_rpc.errors:
                chk_status = JobStatusCode.ERROR
                LOGGER.error("Error in JobList RPC %s", err)
        except EnvironmentError:
            pass

        return chk_status, statuses

    @classmethod
    def cancel(cls, joblist):
        """
        Cancel a job using Flux 0.17.0 cancellation API.

        :param joblist: A list of job identifiers to cancel.
        :return: CancelCode enumeration that reflects result of cancellation.
        "return: A cancel return code indicating how cancellation call exited.
        """
        # We need to import flux here, as it may not be installed on
        # all systems.
        cls.connect_to_flux()

        LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
        LOGGER.debug(
            "Attempting to cancel jobs.\nJoblist:\n%s",
            "\n".join(str(j) for j in joblist),
        )

        # NOTE: cannot pickle JobID instances, so must store as strings and
        # reconstruct for use
        jobs_rpc = flux.job.list.JobList(
            cls.flux_handle,
            ids=[flux.job.JobID(jid) for jid in joblist])

        cancel_code = CancelCode.OK
        cancel_rcode = 0
        for job in jobs_rpc:
            try:
                LOGGER.debug("Cancelling Job %s...", job)
                flux.job.cancel(cls.flux_handle, int(job))
            except Exception as exception:
                LOGGER.error(str(exception))
                cancel_code = CancelCode.ERROR
                cancel_rcode = 1

        return cancel_code, cancel_rcode

    @staticmethod
    def state(state):
        if state == "D":
            return State.PENDING
        elif state == "S":
            return State.QUEUED
        elif state == "R":
            return State.RUNNING
        elif state == "C":
            return State.FINISHING
        elif state == "CD":
            return State.FINISHED
        elif state == "F":
            return State.FAILED
        elif state == "CA":
            return State.CANCELLED
        elif state == "TO":
            return State.TIMEDOUT
        else:
            LOGGER.error(f"Unhandled state: {state}")
            return State.UNKNOWN

cancel(joblist) classmethod

Cancel a job using Flux 0.17.0 cancellation API.

Parameters:

Name Type Description Default
joblist

A list of job identifiers to cancel.

required

Returns:

Type Description

CancelCode enumeration that reflects result of cancellation. "return: A cancel return code indicating how cancellation call exited.

Source code in maestrowf/interfaces/script/_flux/flux0_26_0.py
@classmethod
def cancel(cls, joblist):
    """
    Cancel a job using Flux 0.17.0 cancellation API.

    :param joblist: A list of job identifiers to cancel.
    :return: CancelCode enumeration that reflects result of cancellation.
    "return: A cancel return code indicating how cancellation call exited.
    """
    # We need to import flux here, as it may not be installed on
    # all systems.
    cls.connect_to_flux()

    LOGGER.debug("Handle address -- %s", hex(id(cls.flux_handle)))
    LOGGER.debug(
        "Attempting to cancel jobs.\nJoblist:\n%s",
        "\n".join(str(j) for j in joblist),
    )

    # NOTE: cannot pickle JobID instances, so must store as strings and
    # reconstruct for use
    jobs_rpc = flux.job.list.JobList(
        cls.flux_handle,
        ids=[flux.job.JobID(jid) for jid in joblist])

    cancel_code = CancelCode.OK
    cancel_rcode = 0
    for job in jobs_rpc:
        try:
            LOGGER.debug("Cancelling Job %s...", job)
            flux.job.cancel(cls.flux_handle, int(job))
        except Exception as exception:
            LOGGER.error(str(exception))
            cancel_code = CancelCode.ERROR
            cancel_rcode = 1

    return cancel_code, cancel_rcode